Source code for src.nb_gen_utils

# Copyright (c) 2015 Intracom S.A. Telecom Solutions. All rights reserved.
#
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License v1.0 which accompanies this distribution,
# and is available at http://www.eclipse.org/legal/epl-v10.html

""" Helper functions for NorthBound traffic generator """

import flow_utils
import ipaddress
import json
import logging
import multiprocessing
import requests
import time

[docs]def create_workers(nworkers, flow_template, url_template, op_delay_ms, fpr, auth_token): """ Creates flow workers as separate processes along with their queues. :param nworkers: number of workers to create :param flow_template: template from which flows are created :param url_template: url in which each worker will issue flows. :param op_delay_ms: delay between flow operations in each worker (in milliseconds) :param auth_token: RESTconf authorization token (username/password tuple) :returns: worker queues (operational, result) and worker threads :rtype tuple<lst<multiprocessing.Queue()>> :type nworkers: int :type flow_template: str :type url_template: str :type op_delay_ms: int :type auth_token: tuple<str> """ opqueues = [] wthr = [] resqueues = [] for wid in range(0, nworkers): opqueue = multiprocessing.Queue() resqueue = multiprocessing.Queue() opqueues.append(opqueue) resqueues.append(resqueue) worker = multiprocessing.Process(target=flow_worker_thread, args=(wid, opqueue, resqueue, flow_template, url_template, op_delay_ms, fpr, auth_token)) wthr.append(worker) return (opqueues, wthr, resqueues)
[docs]def flow_worker_thread(wid, opqueue, resqueue, flow_template, url_template, op_delay_ms, fpr, auth_token): """ Function executed by flow worker thread :param wid: worker id :param opqueue: queue where flow master will issue operations :param resqueue:queue where flow master will issue operations :param flow_template: template from which flows are created :param url_template: template for the url of the REST call :param op_delay_ms: delay between thread operations (in milliseconds) :param auth_token: RESTconf authorization token (username/password tuple) :type wid: int :type opqueue: multiprocessing.Queue :type resqueue: multiprocessing.Queue :type flow_template: str :type url_template: str :type op_delay_ms: int :type auth_token: tuple<str> """ logging.debug('[flow_worker_thread] Worker {0} initiating.'.format(wid)) flow_processor = flow_utils.FlowProcessor(flow_template, url_template, auth_token) failed_flow_ops = 0 flow_add_lists = {} # Read request from queue # Op type could be A/D/T, for add/deletion and termination respectively. while True: try: op_type,of_node,flow_id,current_ip = opqueue.get(block=True, timeout=10000) logging.debug( '[flow_worker_thread] Worker {0}, received operation' ' = ( OP = {1}, Node = {2}, Flow-id = {3})'.format(wid, op_type, of_node, flow_id)) if op_type == 'T': logging.debug('[flow_worker_thread] Sending remaining ' 'flows for addition before terminating ' 'worker thread {0}'.format(wid)) for of_node, flow_list in flow_add_lists.items(): if len(flow_list) > 0: status = flow_processor.add_flow(','.join(flow_list), of_node) if status != 200 and status != 204: logging.debug('[flow_worker_thread] failed to add flow.') failed_flow_ops += 1 logging.debug('[flow_worker_thread] ' 'Worker {0} will terminate.'.format(wid)) resqueue.put(failed_flow_ops) return 0 elif op_type == 'A': flow_data = flow_template % (flow_id, 'TestFlow-%d' % flow_id, 65000, str(flow_id), 65000, current_ip) if of_node in flow_add_lists: flow_add_lists[of_node].append(flow_data) else: flow_add_lists[of_node] = [] flow_add_lists[of_node].append(flow_data) if len(flow_add_lists[of_node]) == fpr: status = flow_processor.add_flow(','.join(flow_add_lists[of_node]), of_node) flow_add_lists[of_node][:] = [] logging.debug('[flow_worker_thread] [op_type]: op_type = A ' '(Adding flow)| Status code of the response: ' '{0}.'.format(status)) if status != 200 and status != 204: logging.debug('[flow_worker_thread] failed to add flow.') failed_flow_ops += 1 elif op_type == 'D': print('enter delete function in worker thread') status = flow_processor.remove_flow(flow_id, of_node) logging.debug('[flow_worker_thread] [op_type]: op_type = D ' '(Remove flow)| Status code of the response: ' '{0}.'.format(status)) if status != 200 and status != 204: logging.debug('[flow_worker_thread] failed to delete flow.') failed_flow_ops += 1 time.sleep(float(op_delay_ms)/1000) except: logging.error('[flow_worker_thread] Unable to process rest requests. ' 'Worker {0} will terminate.'.format(wid)) resqueue.put(failed_flow_ops) return -1
[docs]def distribute_workload(nflows, opqueues, operation, node_names): """ Creates operations of the form (operation, 'target_switch', uid, 'IP_address'), one for each flow, and distributes them in a round robin fashion to the worker queues. The operation can be a flow ADD or REMOVE. The target_switch is the switch on which the operation will be applied. The uid is a unique id for the operation. IP_adress is the IP address that will populate the destination IP of the flow created by a template. :param nflows: total number of flows to distribute :param opqueues: workers operation queues :param operation: operation to perform (Add 'A' or Delete 'D') :param node_names: array Name of each node in Opendaylight datastore. :type nflows: int :type opqueues: list<multiprocessing.Queue> :type operation: str :type nodenames: list<str> """ curr_ip = ipaddress.ip_address('0.0.0.0') nworkers = len(opqueues) nnodes = len(node_names) for flow in range(0, nflows): dest_node = flow % nnodes node_name = node_names[dest_node] operation_info = (operation, node_name, flow, curr_ip.compressed) qid = (dest_node - 1) % nworkers opqueues[qid].put(operation_info) curr_ip = curr_ip + 1
[docs]def join_workers(opqueues, resqueues, wthr): """ Terminates all workers by sending a T operation. :param opqueues: workers operation queues :param resqueues: workers result queues :param wthr: worker thread objects :returns: failed_flow_ops :rtype: failed_flow_ops int :type: opqueues list<multiprocessing.Queue> :type: resqueues list<multiprocessing.Queue> :type: wthr """ for curr_queue in opqueues: curr_queue.put(('T', 0, 0, '0')) for worker in wthr: worker.join() failed_flow_ops = 0 for resqueue in resqueues: failed_flow_ops += resqueue.get(block=True) logging.debug('[flow_master_thread] {0} workers terminated.'. format(len(opqueues))) return failed_flow_ops
[docs]def get_node_names(ctrl_ip, ctrl_port, auth_token): """ Fetch node names from the OpenDaylight operational DS :param ctrl_ip: controller IP :param ctrl_port: controller RESTconf port :param auth_token: RESTconf authorization token (username/password tuple) :returns: list with node names registered in operational DS :rtype: node_names: list<str> :type ctrl_ip: str :type ctrl_port: int :type auth_token: tuple<str> """ getheaders = {'Accept': 'application/json'} url_request = ('http://{0}:{1}/restconf/operational/network-topology:' 'network-topology/network-topology:topology/flow:1'. format(ctrl_ip, ctrl_port)) node_names = [] session = requests.Session() session.trust_env = False try: while True: logging.debug( '[flow_master_thread] Trying to fetch node names from datastore') request = session.get(url_request, headers=getheaders, stream=False, auth=(auth_token.controller_restconf_user, auth_token.controller_restconf_password)) json_topology = json.loads(request.text) nodes = json_topology.get('topology')[0].get('node') if nodes is not None: break for node in nodes: node_names.append(node.get('node-id')) return node_names except: raise ValueError('[ERROR] Fail getting node names')
[docs]def flow_transmission_start(opqueues, resqueues, wthr, nflows, ctrl_ip, ctrl_port, auth_token): """Calculates transmission_interval, operation_time, failed_flow_ops :param opqueues: workers operation queues :param resqueues: workers result queues :param wthr: worker threads :param nflows: :param ctrl_ip: :param ctrl_port: :param auth_token: :returns (transmission_interval, operation_time, failed_flow_ops): transmission interval: time interval between requested flow operations operation time: total time failed flow operations: :rtype tuple<str> :type opqueues: list<multiprocessing.Queue()> :type resqueues: list<multiprocessing.Queue()> :type wthr: list<multiprocessing.Process()> :type nflows: int :type ctrl_ip: str :type ctrl_port: str :type auth_token: tuple<str> """ failed_flow_ops = 0 logging.info('[flow_master_thread] starting workers') for worker_thread in wthr: worker_thread.start() logging.info('[flow_operations_calc_time] joining workers') failed_flow_ops += join_workers(opqueues, resqueues, wthr) return failed_flow_ops
[docs]def flows_transmission_run(flow_ops_params, op_delay_ms, node_names, url_template, flow_template, auth_token, fpr, delete_flows_flag=False): """Function executed by flow_master method :param flow_ops_params: namedtuple ['ctrl_ip', 'ctrl_port', 'nflows', 'nworkers'], (controller IP, controller RESTconf port, total number of flows to distribute, number of worker threads to create, deadline for flow discovery (in milliseconds) :param op_delay_ms: delay between thread operations (in milliseconds) :param node_names: list with node names registered in operational DS :param url_template: url for REST request to add/delete flows in controller's operational DS :param flow_template: template of flow in json form to be added/deleted in controller's operational DS :param auth_token: token containing restconf username/password used for REST requests in controller's operational DS :param delete_flows_flag: whether to delete or not the added flows as part of the test :returns tuple with transmission_interval, operation_time, failed_flow_ops transmission interval: time interval between requested flow operations operation time: total time failed flow operations: :rtype: tuple: :type ctrl_ip: str :type ctrl_port: str :type nflows: int :type nworkers: int :type op_delay_ms: int :type node_names: list<str> :type url_template: str :type flow_template: str :type auth_token: tuple<str> :type delete_flows_flag: bool """ operations_log_message = 'ADD' operations_type = 'A' if delete_flows_flag: operations_log_message = 'DEL' operations_type = 'D' logging.info('[flow_ops_calc_time_run] initializing: will perform {0} flow ' 'operations at {1} openflow nodes with {2} workers'.format( flow_ops_params.nflows, len(node_names), flow_ops_params.nworkers)) logging.info('[flow_ops_calc_time_run] creating workers for {0} ops'. format(operations_log_message)) opqueues, wthr, resqueues = create_workers(flow_ops_params.nworkers, flow_template, url_template, op_delay_ms, fpr, auth_token) logging.info('[flow_ops_calc_time_run] distributing workload') distribute_workload(flow_ops_params.nflows, opqueues, operations_type, node_names) failed_flow_ops = flow_transmission_start(opqueues, resqueues, wthr, flow_ops_params.nflows, flow_ops_params.ctrl_ip, flow_ops_params.ctrl_port, auth_token) return failed_flow_ops