mirror of https://github.com/pallets/flask.git
				
				
				
			
		
			
				
	
	
		
			300 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			300 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Python
		
	
	
	
#!/usr/bin/env python
 | 
						|
# -*- coding: utf-8 -*-
 | 
						|
"""
 | 
						|
    flask-07-upgrade
 | 
						|
    ~~~~~~~~~~~~~~~~
 | 
						|
 | 
						|
    This command line script scans a whole application tree and attempts to
 | 
						|
    output an unified diff with all the changes that are necessary to easily
 | 
						|
    upgrade the application to 0.7 and to not yield deprecation warnings.
 | 
						|
 | 
						|
    This will also attempt to find `after_request` functions that don't modify
 | 
						|
    the response and appear to be better suited for `teardown_request`.
 | 
						|
 | 
						|
    This application is indeed an incredible hack, but because what it
 | 
						|
    attempts to accomplish is impossible to do statically it tries to support
 | 
						|
    the most common patterns at least.  The diff it generates should be
 | 
						|
    hand reviewed and not applied blindly without making backups.
 | 
						|
 | 
						|
    :copyright: (c) Copyright 2011 by Armin Ronacher.
 | 
						|
    :license: see LICENSE for more details.
 | 
						|
"""
 | 
						|
import re
 | 
						|
import os
 | 
						|
import inspect
 | 
						|
import difflib
 | 
						|
import posixpath
 | 
						|
from optparse import OptionParser
 | 
						|
 | 
						|
try:
 | 
						|
    import ast
 | 
						|
except ImportError:
 | 
						|
    ast = None
 | 
						|
 | 
						|
 | 
						|
TEMPLATE_LOOKAHEAD = 4096
 | 
						|
 | 
						|
_app_re_part = r'((?:[a-zA-Z_][a-zA-Z0-9_]*app)|app|application)'
 | 
						|
_string_re_part = r"('([^'\\]*(?:\\.[^'\\]*)*)'" \
 | 
						|
                  r'|"([^"\\]*(?:\\.[^"\\]*)*)")'
 | 
						|
 | 
						|
_from_import_re = re.compile(r'^\s*from flask import\s+')
 | 
						|
_url_for_re = re.compile(r'\b(url_for\()(%s)' % _string_re_part)
 | 
						|
_render_template_re = re.compile(r'\b(render_template\()(%s)' % _string_re_part)
 | 
						|
_after_request_re = re.compile(r'((?:@\S+\.(?:app_)?))(after_request)(\b\s*$)(?m)')
 | 
						|
_module_constructor_re = re.compile(r'([a-zA-Z0-9_][a-zA-Z0-9_]*)\s*=\s*Module'
 | 
						|
                                    r'\(__name__\s*(?:,\s*(?:name\s*=\s*)?(%s))?' %
 | 
						|
                                    _string_re_part)
 | 
						|
_error_handler_re = re.compile(r'%s\.error_handlers\[\s*(\d+)\s*\]' % _app_re_part)
 | 
						|
_mod_route_re = re.compile(r'@([a-zA-Z0-9_][a-zA-Z0-9_]*)\.route')
 | 
						|
_blueprint_related = [
 | 
						|
    (re.compile(r'request\.module'), 'request.blueprint'),
 | 
						|
    (re.compile(r'register_module'), 'register_blueprint'),
 | 
						|
    (re.compile(r'%s\.modules' % _app_re_part), '\\1.blueprints')
 | 
						|
]
 | 
						|
 | 
						|
 | 
						|
def make_diff(filename, old, new):
 | 
						|
    for line in difflib.unified_diff(old.splitlines(), new.splitlines(),
 | 
						|
                     posixpath.normpath(posixpath.join('a', filename)),
 | 
						|
                     posixpath.normpath(posixpath.join('b', filename)),
 | 
						|
                     lineterm=''):
 | 
						|
        print line
 | 
						|
 | 
						|
 | 
						|
def looks_like_teardown_function(node):
 | 
						|
    returns = [x for x in ast.walk(node) if isinstance(x, ast.Return)]
 | 
						|
    if len(returns) != 1:
 | 
						|
        return
 | 
						|
    return_def = returns[0]
 | 
						|
    resp_name = node.args.args[0]
 | 
						|
    if not isinstance(return_def.value, ast.Name) or \
 | 
						|
       return_def.value.id != resp_name.id:
 | 
						|
        return
 | 
						|
 | 
						|
    for body_node in node.body:
 | 
						|
        for child in ast.walk(body_node):
 | 
						|
            if isinstance(child, ast.Name) and \
 | 
						|
               child.id == resp_name.id:
 | 
						|
                if child is not return_def.value:
 | 
						|
                    return
 | 
						|
 | 
						|
    return resp_name.id
 | 
						|
 | 
						|
 | 
						|
def fix_url_for(contents, module_declarations=None):
 | 
						|
    if module_declarations is None:
 | 
						|
        skip_module_test = True
 | 
						|
    else:
 | 
						|
        skip_module_test = False
 | 
						|
        mapping = dict(module_declarations)
 | 
						|
    annotated_lines = []
 | 
						|
 | 
						|
    def make_line_annotations():
 | 
						|
        if not annotated_lines:
 | 
						|
            last_index = 0
 | 
						|
            for line in contents.splitlines(True):
 | 
						|
                last_index += len(line)
 | 
						|
                annotated_lines.append((last_index, line))
 | 
						|
 | 
						|
    def backtrack_module_name(call_start):
 | 
						|
        make_line_annotations()
 | 
						|
        for idx, (line_end, line) in enumerate(annotated_lines):
 | 
						|
            if line_end > call_start:
 | 
						|
                for _, line in reversed(annotated_lines[:idx]):
 | 
						|
                    match = _mod_route_re.search(line)
 | 
						|
                    if match is not None:
 | 
						|
                        shortname = match.group(1)
 | 
						|
                        return mapping.get(shortname)
 | 
						|
 | 
						|
    def handle_match(match):
 | 
						|
        if not skip_module_test:
 | 
						|
            modname = backtrack_module_name(match.start())
 | 
						|
            if modname is None:
 | 
						|
                return match.group(0)
 | 
						|
        prefix = match.group(1)
 | 
						|
        endpoint = ast.literal_eval(match.group(2))
 | 
						|
        if endpoint.startswith('.'):
 | 
						|
            endpoint = endpoint[1:]
 | 
						|
        elif '.' not in endpoint:
 | 
						|
            endpoint = '.' + endpoint
 | 
						|
        else:
 | 
						|
            return match.group(0)
 | 
						|
        return prefix + repr(endpoint)
 | 
						|
    return _url_for_re.sub(handle_match, contents)
 | 
						|
 | 
						|
 | 
						|
def fix_teardown_funcs(contents):
 | 
						|
 | 
						|
    def is_return_line(line):
 | 
						|
        args = line.strip().split()
 | 
						|
        return args and args[0] == 'return'
 | 
						|
 | 
						|
    def fix_single(match, lines, lineno):
 | 
						|
        if not lines[lineno + 1].startswith('def'):
 | 
						|
            return
 | 
						|
        block_lines = inspect.getblock(lines[lineno + 1:])
 | 
						|
        func_code = ''.join(block_lines)
 | 
						|
        if func_code[0].isspace():
 | 
						|
            node = ast.parse('if 1:\n' + func_code).body[0].body
 | 
						|
        else:
 | 
						|
            node = ast.parse(func_code).body[0]
 | 
						|
        response_param_name = looks_like_teardown_function(node)
 | 
						|
        if response_param_name is None:
 | 
						|
            return
 | 
						|
        before = lines[:lineno]
 | 
						|
        decorator = [match.group(1) +
 | 
						|
                     match.group(2).replace('after_', 'teardown_') +
 | 
						|
                     match.group(3)]
 | 
						|
        body = [line.replace(response_param_name, 'exception')
 | 
						|
                for line in block_lines if
 | 
						|
                not is_return_line(line)]
 | 
						|
        after = lines[lineno + len(block_lines) + 1:]
 | 
						|
        return before + decorator + body + after
 | 
						|
 | 
						|
    content_lines = contents.splitlines(True)
 | 
						|
    while 1:
 | 
						|
        found_one = False
 | 
						|
        for idx, line in enumerate(content_lines):
 | 
						|
            match = _after_request_re.match(line)
 | 
						|
            if match is None:
 | 
						|
                continue
 | 
						|
            new_content_lines = fix_single(match, content_lines, idx)
 | 
						|
            if new_content_lines is not None:
 | 
						|
                content_lines = new_content_lines
 | 
						|
                break
 | 
						|
        else:
 | 
						|
            break
 | 
						|
 | 
						|
    return ''.join(content_lines)
 | 
						|
 | 
						|
 | 
						|
def get_module_autoname(filename):
 | 
						|
    directory, filename = os.path.split(filename)
 | 
						|
    if filename != '__init__.py':
 | 
						|
        return os.path.splitext(filename)[0]
 | 
						|
    return os.path.basename(directory)
 | 
						|
 | 
						|
 | 
						|
def rewrite_from_imports(prefix, fromlist, lineiter):
 | 
						|
    import_block = [prefix, fromlist]
 | 
						|
    if fromlist[0] == '(' and fromlist[-1] != ')':
 | 
						|
        for line in lineiter:
 | 
						|
            import_block.append(line)
 | 
						|
            if line.rstrip().endswith(')'):
 | 
						|
                break
 | 
						|
    elif fromlist[-1] == '\\':
 | 
						|
        for line in lineiter:
 | 
						|
            import_block.append(line)
 | 
						|
            if line.rstrip().endswith('\\'):
 | 
						|
                break
 | 
						|
 | 
						|
    return ''.join(import_block).replace('Module', 'Blueprint')
 | 
						|
 | 
						|
 | 
						|
def rewrite_blueprint_imports(contents):
 | 
						|
    new_file = []
 | 
						|
    lineiter = iter(contents.splitlines(True))
 | 
						|
    for line in lineiter:
 | 
						|
        match = _from_import_re.search(line)
 | 
						|
        if match is not None:
 | 
						|
            new_file.extend(rewrite_from_imports(match.group(),
 | 
						|
                                                 line[match.end():],
 | 
						|
                                                 lineiter))
 | 
						|
        else:
 | 
						|
            new_file.append(line)
 | 
						|
    return ''.join(new_file)
 | 
						|
 | 
						|
 | 
						|
def rewrite_for_blueprints(contents, filename):
 | 
						|
    modules_declared = []
 | 
						|
    def handle_match(match):
 | 
						|
        target = match.group(1)
 | 
						|
        name_param = match.group(2)
 | 
						|
        if name_param is None:
 | 
						|
            modname = get_module_autoname(filename)
 | 
						|
        else:
 | 
						|
            modname = ast.literal_eval(name_param)
 | 
						|
        modules_declared.append((target, modname))
 | 
						|
        return '%s = %s' % (target, 'Blueprint(%r, __name__' % modname)
 | 
						|
    new_contents = _module_constructor_re.sub(handle_match, contents)
 | 
						|
 | 
						|
    if modules_declared:
 | 
						|
        new_contents = rewrite_blueprint_imports(new_contents)
 | 
						|
 | 
						|
    for pattern, replacement in _blueprint_related:
 | 
						|
        new_contents = pattern.sub(replacement, new_contents)
 | 
						|
    return new_contents, dict(modules_declared)
 | 
						|
 | 
						|
 | 
						|
def upgrade_python_file(filename, contents, teardown):
 | 
						|
    new_contents = contents
 | 
						|
    if teardown:
 | 
						|
        new_contents = fix_teardown_funcs(new_contents)
 | 
						|
    new_contents, modules = rewrite_for_blueprints(new_contents, filename)
 | 
						|
    new_contents = fix_url_for(new_contents, modules)
 | 
						|
    new_contents = _error_handler_re.sub('\\1.error_handler_spec[None][\\2]',
 | 
						|
                                         new_contents)
 | 
						|
    make_diff(filename, contents, new_contents)
 | 
						|
 | 
						|
 | 
						|
def upgrade_template_file(filename, contents):
 | 
						|
    new_contents = fix_url_for(contents, None)
 | 
						|
    make_diff(filename, contents, new_contents)
 | 
						|
 | 
						|
 | 
						|
def walk_path(path):
 | 
						|
    this_file = os.path.realpath(__file__).rstrip('c')
 | 
						|
    for dirpath, dirnames, filenames in os.walk(path):
 | 
						|
        dirnames[:] = [x for x in dirnames if not x.startswith('.')]
 | 
						|
        for filename in filenames:
 | 
						|
            filename = os.path.join(dirpath, filename)
 | 
						|
            if os.path.realpath(filename) == this_file:
 | 
						|
                continue
 | 
						|
            if filename.endswith('.py'):
 | 
						|
                yield filename, 'python'
 | 
						|
            # skip files that are diffs.  These might be false positives
 | 
						|
            # when run multiple times.
 | 
						|
            elif not filename.endswith(('.diff', '.patch', '.udiff')):
 | 
						|
                with open(filename) as f:
 | 
						|
                    contents = f.read(TEMPLATE_LOOKAHEAD)
 | 
						|
                if '{% for' or '{% if' or '{{ url_for' in contents:
 | 
						|
                    yield filename, 'template'
 | 
						|
 | 
						|
 | 
						|
def scan_path(path=None, teardown=True):
 | 
						|
    for filename, type in walk_path(path):
 | 
						|
        with open(filename) as f:
 | 
						|
            contents = f.read()
 | 
						|
        if type == 'python':
 | 
						|
            upgrade_python_file(filename, contents, teardown)
 | 
						|
        elif type == 'template':
 | 
						|
            upgrade_template_file(filename, contents)
 | 
						|
 | 
						|
 | 
						|
def main():
 | 
						|
    """Entrypoint"""
 | 
						|
    parser = OptionParser(usage='%prog [options] [paths]')
 | 
						|
    parser.add_option('-T', '--no-teardown-detection', dest='no_teardown',
 | 
						|
                      action='store_true', help='Do not attempt to '
 | 
						|
                      'detect teardown function rewrites.')
 | 
						|
    parser.add_option('-b', '--bundled-templates', dest='bundled_tmpl',
 | 
						|
                      action='store_true', help='Indicate to the system '
 | 
						|
                      'that templates are bundled with modules.  Default '
 | 
						|
                      'is auto detect.')
 | 
						|
    options, args = parser.parse_args()
 | 
						|
    if not args:
 | 
						|
        args = ['.']
 | 
						|
 | 
						|
    if ast is None:
 | 
						|
        parser.error('Python 2.6 or later is required to run the upgrade script.\n'
 | 
						|
                     'The runtime requirements for Flask 0.7 however are still '
 | 
						|
                     'Python 2.5.')
 | 
						|
 | 
						|
    for path in args:
 | 
						|
        scan_path(path, teardown=not options.no_teardown)
 | 
						|
 | 
						|
 | 
						|
if __name__ == '__main__':
 | 
						|
    main()
 |