Commit 7bc58efd authored by Tim's avatar Tim
Browse files

init

parents
# Byte-compiled / optimized / DLL files
__pycache__/
__pycache__
*.py[cod]
*$py.class
# C extensions
*.so
scratchbook.txt
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
data/
lib64/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
.hypothesis/
.pytest_cache/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
.python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don’t work, or not
# install all needed dependencies.
#Pipfile.lock
# celery beat schedule file
celerybeat-schedule
# SageMath parsed files
*.sage.py
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# ptan - Please take a number
ptan is dead simple, central ticket ("ticket" as "sequencial int numbers") generator for distributed systems.
The Server is non blocking singlethreaded (powered by thriftpy2 and asyncio)
# Install
`pip3 install git+https://git.connect.dzd-ev.de/dzdpythonmodules/ptan.git`
# Getting started
```python
from ptan.server import Server
print("Starting Server")
server = Server()
# We start the server with 'run_in_dedicated_thread' = True, otherwise the script will not continue as it will loop into the server thread
# Use run_in_dedicated_thread = False, if you are building a standalone server
server.run(run_in_dedicated_thread=True)
from ptan.client import Client
print("Create Client")
c = Client()
print(c.getNextID())
print(c.getNextID())
# Start a new ID sequence with a new namespace
print(c.getNextID("someNewNamespace"))
# Return to the default sequence
print(c.getNextID())
print(c.getNextID("someNewNamespace"))
print("#Shutdown Server!")
server.stop()
```
For more advanced examples, have look at the test*.py-files at
https://git.connect.dzd-ev.de/dzdtools/pythonmodules/tree/master/ptan
\ No newline at end of file
import time
import random
import asyncio
import datetime
from thriftpy2.protocol import TCyBinaryProtocolFactory
from thriftpy2.transport import TCyBufferedTransportFactory
from ptan.shared.ptanThriftService import ptanThriftService
from thriftpy2.rpc import client_context
from thriftpy2.transport import TTransportException
import socket
class PtanServerResultEmptyException(Exception):
pass
class Client:
def __init__(self, host="127.0.0.1", port=10932):
self.service = ptanThriftService.ptanService
self.host = host
self.port = port
self.proto_factory = TCyBinaryProtocolFactory()
self.trans_factory = TCyBufferedTransportFactory()
self.timeout = 1000
self.trys = 3
self.loop = asyncio.new_event_loop()
self.server_time_stamp_multiplier = None
def _makeRequest(self, requestname, *args):
try_count = 0
for tr in range(0, self.trys):
try:
with client_context(
self.service,
self.host,
self.port,
connect_timeout=self.timeout,
proto_factory=TCyBinaryProtocolFactory(),
trans_factory=TCyBufferedTransportFactory(),
) as client:
res = getattr(client, requestname)(*args)
if res is None:
raise PtanServerResultEmptyException
return res
except (TTransportException, socket.error, PtanServerResultEmptyException):
if try_count == self.trys:
raise
time.sleep(random.randint(1, 1000) / 1000.0)
try_count = try_count + 1
def extractDateFromTimestampedID(self, id):
if self.server_time_stamp_multiplier is None:
self.server_time_stamp_multiplier = self.getServerConfTimestampMultiplier()
id_str = str(id)
timestamp_base_length = 10 # 1571659799 -> len(str(int(time.time() * 1)))
timestamp_length = (
len(str(self.server_time_stamp_multiplier)) - 1 + timestamp_base_length
)
if len(id) < timestamp_length:
raise ValueError(
"ID '{}' is too short to be a timestamped ID. The server does not provide timestamped ids probably."
)
timestamp_substr = id[:timestamp_length]
return datetime.datetime.fromtimestamp(
int(timestamp_substr) / self.server_time_stamp_multiplier
)
def getServerConfIsTimestamped(self):
return self._makeRequest("getConfIsTimestamped")
def getServerConfTimestampMultiplier(self):
return self._makeRequest("getConfTimestampMultiplier")
def getNextID(self, namespace=""):
return self._makeRequest("getNextID", namespace)
def ping_server(self):
return self._makeRequest("ping")
import time
import asyncio
import os
import logging
import json
log = logging.getLogger("ptan")
log.addHandler(logging.StreamHandler())
log.setLevel(logging.INFO)
class SingletonMetaClass(type):
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super(SingletonMetaClass, cls).__call__(
*args, **kwargs
)
return cls._instances[cls]
class SequenceManager(metaclass=SingletonMetaClass):
namespaces_seq_state = [{}]
conf_default_first_seq = None
seq_step_size = None
conf_add_timestamp = None
persistence = None
def __init__(
self,
server,
namespaces_seq_state: "[{'namespace':str,'seq'int}]" = None,
default_first_seq=1,
timestamped_seqs=False,
default_step_size=1,
persistence=False,
):
self._server = server
self.persistence = persistence
self.conf_default_first_seq = default_first_seq
self.conf_add_timestamp = timestamped_seqs
self.seq_step_size = default_step_size
if namespaces_seq_state is None:
# create default namespace 'None'
self.namespaces_seq_state = [
{"namespace": None, "seq": self.conf_default_first_seq}
]
# Timestamp multiplier. If timestamped_seqs = True this determines how many decimal points are taken into account.
# e.g. 1000000: Microseconds (2019-10-21T13:55:16.4934), 1000: Miliseconds (2019-10-21T14:05:04.956) , 10: tenth of a second (2019-10-21T14:03:15.3)
self.conf_timestamp_multiplier = 10000
async def ping(self):
log.info("Server reads PING")
return "Pong"
async def getConfTimestampMultiplier(self):
if self.conf_add_timestamp is False:
return None
return self.conf_timestamp_multiplier
async def getConfIsTimestamped(self):
return self.conf_add_timestamp
async def getNextID(self, namespace: str = None):
if namespace == "":
namespace = None
if self.conf_add_timestamp:
nid = int(
str(int(time.time() * self.conf_timestamp_multiplier))
+ str(self._get_seq(namespace))
)
else:
nid = self._get_seq(namespace)
return str(nid)
def _get_seq(self, namespace: str):
# print(self.namespaces_seq_state)
seq = None
i = 0
for i, namesp_seq_dict in enumerate(self.namespaces_seq_state):
if namesp_seq_dict["namespace"] == namespace:
seq = namesp_seq_dict["seq"]
break
if seq is None:
i = self._create_namespace(namespace)
seq = self.namespaces_seq_state[i]["seq"]
if self.persistence:
self._server._save_persistence()
self._increment_seq(i)
return seq
def _create_namespace(self, name):
self.namespaces_seq_state.append(
{"namespace": name, "seq": self.conf_default_first_seq}
)
# return index of new namespace
return len(self.namespaces_seq_state) - 1
def _increment_seq(self, namespace_index: str):
self.namespaces_seq_state[namespace_index]["seq"] = (
self.namespaces_seq_state[namespace_index]["seq"] + self.seq_step_size
)
class Server(metaclass=SingletonMetaClass):
class persistent_modes(object):
# the Server wont save any state. all sequences will be deleted/lost on shutdown. Best perfomance
none = None
# The server will store and load sequences when shutdown gracefully and start. Good perfomance but sequences wont survive a crash
on_start_load = "onsl"
# On every transaction, the server will store the new state of the sequences. This is the most secure option for persistence but at the expense of perfomance
on_transaction = "ont"
def __init__(
self,
port: int = 10932,
host="127.0.0.1",
default_first_seq=1,
timestamped_seqs=False,
default_step_size=1,
persistent_mode=persistent_modes.none,
persistence_storage_path="/var/lib/ptan/state.json",
):
from thriftpy2.protocol import TCyBinaryProtocolFactory
from thriftpy2.transport import TCyBufferedTransportFactory
from thriftpy2.thrift import TMultiplexedProcessor
from thriftpy2.server import TThreadedServer
from ptan.shared.ptanThriftService import ptanThriftService
self.persistent_mode = persistent_mode
if self.persistent_mode == self.persistent_modes.on_transaction:
sequence_manager_persistence = True
else:
sequence_manager_persistence = False
self.persistence_storage_path = persistence_storage_path
self.sequence_manager = SequenceManager(
self,
default_first_seq=default_first_seq,
timestamped_seqs=timestamped_seqs,
default_step_size=default_step_size,
persistence=sequence_manager_persistence,
)
self.service = ptanThriftService.ptanService
self.sequence_manager
self.host = host
self.port = port
self.proto_factory = TCyBinaryProtocolFactory()
self.trans_factory = TCyBufferedTransportFactory()
self.client_timeout = 4000
if self.persistent_mode is not self.persistent_modes.none:
self._load_persistence()
def run(self, run_in_dedicated_thread=True):
log.info("Server starting....")
if run_in_dedicated_thread:
import threading
self.server_thread = threading.Thread(target=self._start_server, args=())
self.server_thread.start()
else:
self._start_server()
log.info("Server started!")
def _start_server(self):
from thriftpy2.rpc import make_aio_server
import asyncio
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self.server = make_aio_server(
service=self.service,
handler=self.sequence_manager,
host=self.host,
port=self.port,
client_timeout=self.client_timeout,
loop=self.loop,
)
self.server.serve()
def stop(self):
self.server.close()
self.loop.call_soon_threadsafe(self.loop.stop)
if self.persistent_mode is not self.persistent_modes.none:
self._save_persistence()
def _load_persistence(self):
if os.path.isfile(self.persistence_storage_path):
with open(self.persistence_storage_path) as storage_file:
data = json.load(storage_file)
self.sequence_manager.namespaces_seq_state = data
else:
log.info(
"No storage file found at {}. tpan will start as fresh instance.".format(
self.persistence_storage_path
)
)
def _save_persistence(self):
if not os.path.isdir(os.path.dirname(self.persistence_storage_path)):
os.makedirs(self.persistence_storage_path)
with open(self.persistence_storage_path, "w") as fout:
json.dump(self.sequence_manager.namespaces_seq_state, fout)
log.debug("Saved sequence state")
service ptanService {
/**
* A method definition looks like C code. It has a return type, arguments,
* and optionally a list of exceptions that it may throw. Note that argument
* lists and exception lists are specified using the exact same syntax as
* field lists in struct or exception definitions.
*/
string ping(),
#i64 getNextID(1:string string1),
string getNextID(1:string string1),
bool getConfIsTimestamped(),
i32 getConfTimestampMultiplier(),
}
\ No newline at end of file
import thriftpy2
import os
_SCRIPT_DIR = os.path.dirname(
os.path.realpath(os.path.join(os.getcwd(), os.path.expanduser(__file__)))
)
_thrift_definiton_file_path = os.path.join(_SCRIPT_DIR, "ptanThriftDefinition.thrift")
ptanThriftService = thriftpy2.load(
_thrift_definiton_file_path, module_name="ptan_thrift"
)
import os
import sys
import time
import random
if __name__ == "__main__":
SCRIPT_DIR = os.path.dirname(
os.path.realpath(os.path.join(os.getcwd(), os.path.expanduser(__file__)))
)
print(SCRIPT_DIR)
sys.path.append(os.path.normpath(SCRIPT_DIR))
from ptan.client import Client
c = Client()
print("> Try to ping server...")
print("Answer form server: ", c.ping_server())
test_id = c.getNextID()
print("First requested ID is {}".format(test_id))
if c.getServerConfIsTimestamped():
print(
"{} is timestamped for {}".format(
test_id, c.extractDateFromTimestampedID(test_id).isoformat()
)
)
print("> LOAD TEST...")
def runcl(n=0):
time.sleep(random.randint(1, 1000) / 1000000.0)
return Client().getNextID()
from concurrent.futures import ProcessPoolExecutor, wait
import concurrent.futures
futures = []
no_of_request = 10000
no_of_processes = 16
print(
"> CAPTAIN! {} cannons loaded with {} shots. ready for you command... ".format(
no_of_processes, no_of_request
)
)
print("> Fire at will!")
from linetimer import CodeTimer
with CodeTimer("Cached"):
with concurrent.futures.ProcessPoolExecutor(max_workers=16) as executor:
for i in range(0, no_of_request):
# results = executor.map(runcl)
futures.append(executor.submit(runcl))
print("> All fired, captain! Waiting for impacts...")
wait(futures)
res = [fut.result() for fut in futures]
print("generated {} ids".format(len(res)))
print("All ids are unique?:{}".format(str(len(res) == len(set(res)))))
print("> Get ID in new namepsace")
print(c.getNextID("NewNameSPace"))
print("> Get ID in old namepsace")
print(c.getNextID())
print("> And get ID in newer namepsace again")
print(c.getNextID("NewNameSPace"))
import os
import sys
if __name__ == "__main__":
SCRIPT_DIR = os.path.dirname(
os.path.realpath(os.path.join(os.getcwd(), os.path.expanduser(__file__)))
)
print(SCRIPT_DIR)
sys.path.append(os.path.normpath(SCRIPT_DIR))
from ptan.server import Server
print("Starting Server")
server = Server(timestamped_seqs=True)
server.run(run_in_dedicated_thread=False)
from setuptools import setup
setup(
name="ptan",
version="1.0.1",
description="ptan is ticket('ticket' as 'sequencial int numbers') generator for distributed systems.",
url="",
author="TB",
author_email="tim.bleimehl@helmholtz-muenchen.de",
license="unlicense",
packages=["ptan", "ptan.shared.ptanThriftService"],
install_requires=["thriftpy2", "asyncio"],
zip_safe=False,
)
import os
import sys
import time
if __name__ == "__main__":
SCRIPT_DIR = os.path.dirname(
os.path.realpath(os.path.join(os.getcwd(), os.path.expanduser(__file__)))
)
print(SCRIPT_DIR)
sys.path.append(os.path.normpath(SCRIPT_DIR))
from ptan.server import Server
print("Starting Server")
server = Server(persistent_mode=Server.persistent_modes.none)
# server = Server()
# We start the server with 'run_in_dedicated_thread' = True, otherwise the script will not continue as it will loop into the server thread
# Use run_in_dedicated_thread = False, if you are building a standalone server
server.run(run_in_dedicated_thread=True)
from ptan.client import Client
c = Client()
assert c.getNextID() == "1"
assert c.getNextID() == "2"
# Start a new ID sequence with a new namespace
assert c.getNextID("someNewNamespace") == "1"
# Return to the default sequence
assert c.getNextID() == "3"
assert c.getNextID("someNewNamespace") == "2"
time.sleep(1)
server.stop()
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment