Source code for src.flow_utils
__author__ = "Jan Medved"
__copyright__ = "Copyright(c) 2014, Cisco Systems, Inc."
__license__ = "New-style BSD"
__email__ = "jmedved@cisco.com"
# Copyright (c) 2015 Intracom S.A. Telecom Solutions. All rights reserved.
#
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License v1.0 which accompanies this distribution,
# and is available at http://www.eclipse.org/legal/epl-v10.html
"""
Reusable classes or functions for processes that are flow manipulation related
url: https://wiki.opendaylight.org/view/CrossProject:Integration_Group:Performance_Tests
Acknowledgement: Jan Medved, jmedved@cisco.com, Cisco Systems, Inc.
"""
import requests
import re
import json
import logging
[docs]class FlowProcessor(object):
"""
Helper object used to add and remove flows based on predefined templates.
"""
def __init__(self, flow_template, url_template, auth_token):
"""
:param flow_template: template of the actual json call representing \
a flow
:param url_template: template for the url used for each flow
:param auth_token: restconf authorization token (username/password tuple)
:type flow_template: str
:type url_template: str
:type auth_token: tuple<str>
"""
self.putheaders = {'content-type': 'application/json'}
self.getheaders = {'Accept': 'application/json'}
self.flow_template = flow_template
self.url_template = url_template
self.session = requests.Session()
self.session.trust_env = False
self.auth_token = auth_token
self.flow_template = """{"flow": [%s]}"""
[docs] def add_flow(self, flow_data_body, node_id):
"""
Adds a flow to the specified node
:param node_id: ID of the node to which we will add the flow
:param ip_dest: IP address to populate the destination IP field of the \
flow template
:returns: status code for the http call issued
:rtype: int
:type node_id: int
:type ip_dest: str
"""
# Disable logging during performing requests
logging.disable(logging.CRITICAL)
flow_data = self.flow_template % (flow_data_body)
flow_url = self.url_template % (node_id)
try:
request = self.session.post(flow_url, data=flow_data,
headers=self.putheaders, stream=False,
auth=self.auth_token)
return request.status_code
except:
return -1
finally:
# Enable logging after performing requests
logging.disable(logging.NOTSET)
[docs] def remove_flow(self, flow_id, node_id):
"""
Removes a flow from the specified node
:param flow_id: ID of the flow to remove
:param node_id: ID of the node from which the flow will be removed
:returns: status code for the http call issued
:rtype: int
:type flow_id: int
:type node_id: int
"""
# Disable logging during performing requests
logging.disable(logging.CRITICAL)
flow_url = self.url_template % (node_id, flow_id)
try:
request = self.session.delete(flow_url, headers=self.getheaders,
auth=self.auth_token)
return request.status_code
except:
return -1
finally:
# Enable logging after performing requests
logging.disable(logging.NOTSET)
[docs]class FlowExplorer(object):
"""
Object used to explore controller inventory flows, using the NB REST \
interface.
"""
def __init__(self, controller_ip, restconf_port, datastore, auth_token):
"""
:param controller_ip: controller IP address
:param restconf_port: controller RESTconf port number
:param datastore: type datastore elements to retrieve from operational \
datastore.
:param auth_token: restconf authorization token \
(username/password tuple)
:type controller_ip: str
:type restconf_port: int
:type datastore: str
:type auth_token: tuple<str>
"""
self.inventory_stats_url = \
'http://{0}:{1}/restconf/{2}/opendaylight-inventory:nodes'.format(
controller_ip, restconf_port, datastore)
self.found_flows = 0
self.active_flows = 0
self.table_stats_unavailable = 0
self.table_stats_fails = []
self.getheaders = {'Accept': 'application/json'}
self.auth_token = auth_token
[docs] def get_inventory_flows_stats(self):
"""
Collects and prints statistics information about all installed flows \
for all the nodes of the topology
"""
self.found_flows = 0
self.active_flows = 0
self.table_stats_unavailable = 0
self.table_stats_fails = []
# Disable logging during performing requests
logging.disable(logging.CRITICAL)
s = requests.Session()
s.trust_env = False
try:
req = s.get(self.inventory_stats_url,
headers=self.getheaders,
stream=False,
auth=self.auth_token)
str_response = req.content.decode('utf-8')
except:
self.active_flows = 0
self.found_flows = 0
return -1
finally:
# Enable logging after performing requests
logging.disable(logging.NOTSET)
if req.status_code == 200:
try:
self.nodes = json.loads(str_response)['nodes']['node']
switches = []
for node in self.nodes:
if re.search('openflow', node['id']) is not None:
switches.append(node)
switches = sorted(
switches,
key=lambda k: int(
re.findall(
'\d+',
k['id'])[0]))
for switch in switches:
try:
tables = switch['flow-node-inventory:table']
for table in tables:
try:
stats = table['opendaylight-flow-table-statistics:flow-table-statistics']
self.active_flows = int(stats['active-flows'])
except KeyError:
self.table_stats_unavailable += 1
try:
self.found_flows += len(table['flow'])
except KeyError:
pass
if self.table_stats_unavailable > 0:
self.table_stats_fails.append(switch['id'])
except KeyError:
logging.error('Data for tables not available.')
except KeyError:
logging.error('Could not retrieve inventory, response not in '
'JSON format')
else:
logging.error('Could not retrieve inventory, HTTP error {0}'.
format(req.status_code))
s.close()