Skip to content

Instantly share code, notes, and snippets.

@pkittenis
Created October 14, 2017 01:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pkittenis/e0221acae77fd0fc251951dd9c6aa69b to your computer and use it in GitHub Desktop.
Save pkittenis/e0221acae77fd0fc251951dd9c6aa69b to your computer and use it in GitHub Desktop.
from pssh.pssh_client import ParallelSSHClient
from pssh.pssh2_client import ParallelSSHClient as ParallelSSH2Client
from pssh.utils import load_private_key
from gevent import monkey, sleep, threadpool, spawn, joinall
import os
import socket
import sys
from threading import Thread
try:
from Queue import PriorityQueue
except ImportError:
from queue import PriorityQueue
from datetime import datetime
import pwd
from decimal import Decimal
import time
CMD = 'cat Projects/ssh2-python/LICENSE'
FILE = 'dl/PDF%20File%20Reader_1.4_apk-dl.com.apk'
FILE_ABS = os.path.expanduser('~/' + FILE)
graph_host = 'localhost'
graph_port = 2003
ssh_host = 'localhost'
_QUEUE = PriorityQueue()
def send_to_graphite(queue):
graph_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
graph_sock.connect((graph_host, graph_port))
while True:
message = queue.get()
graph_sock.sendall(message)
queue.task_done()
def _make_data(app, commands_start, chan_read,
commands_finished, commands_restart, total, clients=1):
data = {
'.'.join([app,'auth_and_execute']): format(Decimal(
str(commands_start.total_seconds() * 1000)), '.3f'),
'.'.join([app,'execute']): format(Decimal(
str(commands_restart.total_seconds() * 1000)), '.3f'),
'.'.join([app,'channel_read']): format(Decimal(
str(chan_read.total_seconds() * 1000)), '.3f'),
'.'.join([app,'close_and_exit_status']): format(Decimal(
str(commands_finished.total_seconds() * 1000)), '.3f'),
'.'.join([app,'total']): format(Decimal(
str(total.total_seconds() * 1000)), '.3f'),
'.'.join([app,'clients']): str(clients),
}
return data
def queue_data(queue, data, priority):
message = make_message(data)
queue.put(message, priority)
def run_test(_queue, client, name):
clients = client.pool_size
start = datetime.now()
now = datetime.now()
try:
output = client.run_command(CMD)
except Exception:
data = {'%s.failure' % name: '1'}
queue_data(_queue, data, clients)
# raise
return
commands_start = datetime.now() - now
now = datetime.now()
client.join(output)
commands_finished = datetime.now() - now
now = datetime.now()
for host, host_output in output.items():
if host_output.stdout is None:
continue
for line in host_output.stdout:
# print(line)
pass
chan_read = datetime.now() - now
now = datetime.now()
try:
client.run_command(CMD)
except Exception:
data = {'%s.failure' % name: '1'}
queue_data(_queue, data, clients)
return
commands_restart = datetime.now() - now
total = datetime.now() - start
data = _make_data(name, commands_start, chan_read,
commands_finished,
commands_restart,
total, clients=clients)
queue_data(_queue, data, clients)
def make_message(data):
"""Make and return metrics message(s)"""
dt = datetime.now()
test_data = [b"%s %s %s\n" % (serie.encode('utf-8'),
data[serie].encode('utf-8'),
dt.strftime("%s").encode('utf-8'))
for serie in data]
test_data = b"".join(test_data)
return test_data
def start_graph_thread():
graph_thread = Thread(target=send_to_graphite, args=(_QUEUE,))
graph_thread.daemon = True
graph_thread.start()
return graph_thread
def run_paramiko_workers(hosts):
client = ParallelSSHClient(hosts, num_retries=1,
pool_size=len(hosts),
allow_agent=False,
pkey=load_private_key(os.path.expanduser('~/.ssh/local')))
return run_test(_QUEUE, client, 'paramiko')
def run_ssh2_workers(hosts):
client = ParallelSSH2Client(hosts, num_retries=1,
pool_size=len(hosts),
allow_agent=False,
pkey=os.path.expanduser('~/.ssh/local'))
return run_test(_QUEUE, client, 'ssh2')
def _start(num_workers, step=1):
for _max in range(1, num_workers+step+1, step):
hosts = [ssh_host for _ in range(_max)]
# run_paramiko_workers(hosts)
run_ssh2_workers(hosts)
def _start_ssh2(num_workers, step=1):
for _max in range(1, num_workers+step+1, step):
hosts = [ssh_host for _ in range(_max)]
now = datetime.now()
run_ssh2_workers(hosts)
diff = datetime.now() - now
if diff.total_seconds() < 2:
sleep(2 - diff.total_seconds())
def test():
hosts = [ssh_host for _ in range(5)]
run_ssh2_workers(hosts)
run_paramiko_workers(hosts)
_QUEUE.join()
sys.exit(0)
if __name__ == "__main__":
try:
max_workers = int(sys.argv[1])
except IndexError:
sys.stderr.write("Run as %s <max concurrency>\n" % sys.argv[0])
sys.exit(1)
except TypeError:
sys.stderr.write("Max workers must be integer\n")
sys.exit(1)
graph_thread = start_graph_thread()
# test()
_start(max_workers)
# _start_ssh2(max_workers)
_QUEUE.join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment