ficapy
7/21/2016 - 12:57 AM

精简pycallgraph代码,生成调用关系图 python3.5.1 代码解读https://ficapy.github.io/2016/07/22/pycallgraph_note/

精简pycallgraph代码,生成调用关系图 python3.5.1 代码解读https://ficapy.github.io/2016/07/22/pycallgraph_note/

===================================func_name====================================   nums
requests.sessions.Session.resolve_redirects                                         12
requests.adapters.HTTPAdapter.send                                                  9
requests.sessions.Session.send                                                      7
requests.packages.urllib3.connectionpool.HTTPConnectionPool.urlopen                 7
requests.packages.urllib3.connectionpool.HTTPSConnectionPool.urlopen                7

=====================================model======================================   nums
requests.sessions                                                                   46
urllib.parse                                                                        32
http.cookiejar                                                                      29
requests.cookies                                                                    22
requests.packages.urllib3.connectionpool                                            20
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Author: Ficapy
# Create: '20/7/2016'
import inspect
import os
import re
import tempfile
import textwrap
import pkgutil
from collections import defaultdict
from distutils.sysconfig import get_python_lib
from functools import lru_cache


class Output():
    def __init__(self, filename='call.png'):
        self.filename = filename

    @property
    def edges(self):
        output = []
        for src_func, dests in self.data.items():
            for dst_func, calls in dests.items():
                singal = '"{}" -> "{}" ["label"={}];'.format(src_func, dst_func, calls)
                output.append(singal)
        return output

    @property
    def groups(self):
        output = []
        for group, funcs in self.group.items():
            # 排除一个funcs的
            if len(funcs) == 1:
                continue
            func = '" "'.join(funcs)
            output.append(
                'subgraph "cluster_{group}" {{ '
                '"{func}"; '
                'label = "{group}"; '
                'fontcolor = "black"; '
                'style = "bold"; }}'.format(group=group, func=func))
        return output

    def generate(self):
        '''Returns a string with the contents of a DOT file for Graphviz to
        parse.
        '''
        indent_join = '\n' + ' ' * 12

        return textwrap.dedent('''\
        digraph G {{

            // Attributes
            node [ style = "filled", fontname = "Verdana", shape = "rect", fontsize = "7", fontcolor = "#000000ff" ];
            edge [ fontname = "Verdana", fontsize = "7", fontcolor = "#000000ff" ];

            // Groups
            {}

            // Edges
            {}

        }}
        '''.format(
            indent_join.join(self.groups),
            indent_join.join(self.edges),
        ))

    @staticmethod
    def analyze(call_dict, grp):
        func_map_line = defaultdict(int)
        model_map_line = defaultdict(int)
        for src_func, dests in call_dict.items():
            func_map_line[src_func] += len(dests)
            for dst_func, calls in dests.items():
                func_map_line[dst_func] += 1

        print('{:=^80}{:^10}'.format('func_name', 'nums'))
        for func, nums in sorted(func_map_line.items(), key=lambda x: x[-1], reverse=True)[:5]:
            print('{:<80}{:^10}'.format(func, nums))

        for g, funcs in grp.items():
            for func in funcs:
                model_map_line[g] += func_map_line.get(func, 0)

        print('\n{:=^80}{:^10}'.format('model', 'nums'))
        for model, nums in sorted(model_map_line.items(), key=lambda x: x[-1], reverse=True)[:5]:
            print('{:<80}{:^10}'.format(model, nums))

    def __call__(self, data, group):
        self.data = data
        self.group = group
        source = self.generate()
        self.analyze(self.data, self.group)

        fd, temp_name = tempfile.mkstemp()
        with os.fdopen(fd, 'w') as f:
            f.write(source)

        cmd = '{} -T{} -o{} {}'.format('/usr/local/bin/dot', 'png', self.filename, temp_name)
        try:
            ret = os.system(cmd)
            if ret:
                raise Exception('The command "%(cmd)s" failed with error code %(ret)i.' % locals())
        finally:
            os.unlink(temp_name)


class PyCallGraph():
    def __init__(self, output=None, filter=None):
        self.frame_stack = ['__main__']
        self.call_dict = defaultdict(lambda: defaultdict(int))
        self.grp = defaultdict(set)
        self.max_call_frame = 9999
        self.output = output or Output()
        self.filter = filter or Filter()

    def dispatch_trace(self, frame, event, args):
        if event == 'call':
            self._trace_call(frame, event, args)
        elif event == 'return':
            self._trace_return(frame, event, args)
        return self.dispatch_trace

    def _get_readable_funcname(self, frame):
        func_name = type('Func_name', (object,), {'func_name': None, 'module_name': None})
        if isinstance(frame, str):
            func_name.func_name = frame
            func_name.module_name = frame
            return func_name
        code = frame.f_code

        full_name_list = []

        module = inspect.getmodule(code)
        if module:
            module_name = module.__name__
            if module_name == '__main__':
                module_name = ''
        else:
            module_name = ''

        if module_name:
            full_name_list.append(module_name)

        try:
            class_name = frame.f_locals['self'].__class__.__name__
        except (KeyError, AttributeError):
            pass
        else:
            full_name_list.append(class_name)

        func = code.co_name
        if func == '?':
            func = '__main__'
        full_name_list.append(func)

        func_name.func_name = '.'.join(full_name_list)
        func_name.module_name = module_name
        return func_name

    def _trace_call(self, frame, event, args):
        if len(self.frame_stack) >= self.max_call_frame:
            return
        func_name = self._get_readable_funcname(frame)
        if self.filter(func_name):
            self.max_call_frame = len(self.frame_stack) + 1

        src_func = self._get_readable_funcname(self.frame_stack[-1])
        self.filter(src_func)

        self.call_dict[src_func.func_name][func_name.func_name] += 1
        self.grp[func_name.module_name].add(func_name.func_name)

        self.frame_stack.append(frame)

    def _trace_return(self, frame, event, args):
        if frame is self.frame_stack[-1]:
            self.frame_stack.pop(-1)
            self.max_call_frame = 9999

    def __enter__(self):
        sys.settrace(self.dispatch_trace)
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        sys.settrace(None)
        self.filter.exec_exclude(self)
        self.output(data=self.call_dict, group=self.grp)


inspect.getmodule = lru_cache()(inspect.getmodule)


class Filter():
    site_package = []
    bulit_in = []
    for filefinder, name, _ in pkgutil.iter_modules():
        if get_python_lib() == filefinder.path:
            site_package.append(name)
        elif '/lib/' in filefinder.path:
            bulit_in.append(name)

    def __init__(self, exclude=None):
        self.exclude = exclude or []

    def __call__(self, func_name):
        model = func_name.module_name
        name = func_name.func_name
        if model in self.exclude:
            func_name.func_name = model
            return True
        if model in self.bulit_in:
            func_name.func_name = model
            return True
        if '.' not in name:
            return True
        if '._' in name or name.startswith('_'):
            return True
        if re.search(r'pycallgraph', name, re.I):
            return True

        return False

    def exec_exclude(self, pycallgraph):
        for model in self.exclude:
            self._del_element(model, pycallgraph)

    def _del_element(self, element, pycallgraph):
        pycallgraph.call_dict.pop(element, None)
        for src_func, dst_func in pycallgraph.call_dict.items():
            dst_func.pop(element, None)

        pycallgraph.grp.pop(element, None)
        for _, x in pycallgraph.grp.items():
            x.discard(element)


import sys
import requests

filter = Filter()
filter.exclude = ['requests.structures', 'requests.models', 'requests.utils', '', 'collections.abc', 'abc',
                  'email.message']
with PyCallGraph(output=Output(filename='requests.png'), filter=filter) as py:
    requests.get('http://www.z.cn')