summaryrefslogtreecommitdiff
path: root/lib/python2.7/site-packages/pip-0.8.1-py2.7.egg/pip/vcs/git.py
blob: 0701e49ec322cf488219c888862cd1864d1ef8ad (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
import os
import shutil
import tempfile
import re
from pip import call_subprocess
from pip.util import display_path
from pip.vcs import vcs, VersionControl
from pip.log import logger
from urllib import url2pathname
from urlparse import urlsplit, urlunsplit


class Git(VersionControl):
    name = 'git'
    dirname = '.git'
    repo_name = 'clone'
    schemes = ('git', 'git+http', 'git+ssh', 'git+git', 'git+file')
    bundle_file = 'git-clone.txt'
    guide = ('# This was a Git repo; to make it a repo again run:\n'
        'git init\ngit remote add origin %(url)s -f\ngit checkout %(rev)s\n')

    def __init__(self, url=None, *args, **kwargs):

        # Works around an apparent Git bug
        # (see http://article.gmane.org/gmane.comp.version-control.git/146500)
        if url:
            scheme, netloc, path, query, fragment = urlsplit(url)
            if scheme.endswith('file'):
                initial_slashes = path[:-len(path.lstrip('/'))]
                newpath = initial_slashes + url2pathname(path).replace('\\', '/').lstrip('/')
                url = urlunsplit((scheme, netloc, newpath, query, fragment))
                after_plus = scheme.find('+')+1
                url = scheme[:after_plus]+ urlunsplit((scheme[after_plus:], netloc, newpath, query, fragment))

        super(Git, self).__init__(url, *args, **kwargs)

    def parse_vcs_bundle_file(self, content):
        url = rev = None
        for line in content.splitlines():
            if not line.strip() or line.strip().startswith('#'):
                continue
            url_match = re.search(r'git\s*remote\s*add\s*origin(.*)\s*-f', line)
            if url_match:
                url = url_match.group(1).strip()
            rev_match = re.search(r'^git\s*checkout\s*-q\s*(.*)\s*', line)
            if rev_match:
                rev = rev_match.group(1).strip()
            if url and rev:
                return url, rev
        return None, None

    def unpack(self, location):
        """Clone the Git repository at the url to the destination location"""
        url, rev = self.get_url_rev()
        logger.notify('Cloning Git repository %s to %s' % (url, location))
        logger.indent += 2
        try:
            if os.path.exists(location):
                os.rmdir(location)
            call_subprocess(
                [self.cmd, 'clone', url, location],
                filter_stdout=self._filter, show_stdout=False)
        finally:
            logger.indent -= 2

    def export(self, location):
        """Export the Git repository at the url to the destination location"""
        temp_dir = tempfile.mkdtemp('-export', 'pip-')
        self.unpack(temp_dir)
        try:
            if not location.endswith('/'):
                location = location + '/'
            call_subprocess(
                [self.cmd, 'checkout-index', '-a', '-f', '--prefix', location],
                filter_stdout=self._filter, show_stdout=False, cwd=temp_dir)
        finally:
            shutil.rmtree(temp_dir)

    def check_rev_options(self, rev, dest, rev_options):
        """Check the revision options before checkout to compensate that tags
        and branches may need origin/ as a prefix.
        Returns the SHA1 of the branch or tag if found.
        """
        revisions = self.get_tag_revs(dest)
        revisions.update(self.get_branch_revs(dest))
        inverse_revisions = dict((v, k) for k, v in revisions.iteritems())
        # Check if rev is a branch name
        origin_rev = 'origin/%s' % rev
        if origin_rev in inverse_revisions:
            return [inverse_revisions[origin_rev]]
        elif rev in inverse_revisions:
            return [inverse_revisions[rev]]
        else:
            logger.warn("Could not find a tag or branch '%s', assuming commit." % rev)
            return rev_options

    def switch(self, dest, url, rev_options):
        call_subprocess(
            [self.cmd, 'config', 'remote.origin.url', url], cwd=dest)
        call_subprocess(
            [self.cmd, 'checkout', '-q'] + rev_options, cwd=dest)

    def update(self, dest, rev_options):
        call_subprocess([self.cmd, 'pull', '-q'], cwd=dest)
        call_subprocess(
            [self.cmd, 'checkout', '-q', '-f'] + rev_options, cwd=dest)

    def obtain(self, dest):
        url, rev = self.get_url_rev()
        if rev:
            rev_options = [rev]
            rev_display = ' (to %s)' % rev
        else:
            rev_options = ['master']
            rev_display = ''
        if self.check_destination(dest, url, rev_options, rev_display):
            logger.notify('Cloning %s%s to %s' % (url, rev_display, display_path(dest)))
            call_subprocess([self.cmd, 'clone', '-q', url, dest])
            if rev:
                rev_options = self.check_rev_options(rev, dest, rev_options)
                # Only do a checkout if rev_options differs from HEAD
                if not self.get_revision(dest).startswith(rev_options[0]):
                    call_subprocess([self.cmd, 'checkout', '-q'] + rev_options, cwd=dest)

    def get_url(self, location):
        url = call_subprocess(
            [self.cmd, 'config', 'remote.origin.url'],
            show_stdout=False, cwd=location)
        return url.strip()

    def get_revision(self, location):
        current_rev = call_subprocess(
            [self.cmd, 'rev-parse', 'HEAD'], show_stdout=False, cwd=location)
        return current_rev.strip()

    def get_tag_revs(self, location):
        tags = call_subprocess(
            [self.cmd, 'tag', '-l'],
            show_stdout=False, raise_on_returncode=False, cwd=location)
        tag_revs = []
        for line in tags.splitlines():
            tag = line.strip()
            rev = call_subprocess(
                [self.cmd, 'rev-parse', tag], show_stdout=False, cwd=location)
            tag_revs.append((rev.strip(), tag))
        tag_revs = dict(tag_revs)
        return tag_revs

    def get_branch_revs(self, location):
        branches = call_subprocess(
            [self.cmd, 'branch', '-r'], show_stdout=False, cwd=location)
        branch_revs = []
        for line in branches.splitlines():
            line = line.split('->')[0].strip()
            branch = "".join([b for b in line.split() if b != '*'])
            rev = call_subprocess(
                [self.cmd, 'rev-parse', branch], show_stdout=False, cwd=location)
            branch_revs.append((rev.strip(), branch))
        branch_revs = dict(branch_revs)
        return branch_revs

    def get_src_requirement(self, dist, location, find_tags):
        repo = self.get_url(location)
        if not repo.lower().startswith('git:'):
            repo = 'git+' + repo
        egg_project_name = dist.egg_name().split('-', 1)[0]
        if not repo:
            return None
        current_rev = self.get_revision(location)
        tag_revs = self.get_tag_revs(location)
        branch_revs = self.get_branch_revs(location)

        if current_rev in tag_revs:
            # It's a tag
            full_egg_name = '%s-%s' % (egg_project_name, tag_revs[current_rev])
        elif (current_rev in branch_revs and
              branch_revs[current_rev] != 'origin/master'):
            # It's the head of a branch
            full_egg_name = '%s-%s' % (dist.egg_name(),
                                       branch_revs[current_rev].replace('origin/', ''))
        else:
            full_egg_name = '%s-dev' % dist.egg_name()

        return '%s@%s#egg=%s' % (repo, current_rev, full_egg_name)

    def get_url_rev(self):
        """
        Prefixes stub URLs like 'user@hostname:user/repo.git' with 'ssh://'.
        That's required because although they use SSH they sometimes doesn't
        work with a ssh:// scheme (e.g. Github). But we need a scheme for
        parsing. Hence we remove it again afterwards and return it as a stub.
        """
        if not '://' in self.url:
            assert not 'file:' in self.url
            self.url = self.url.replace('git+', 'git+ssh://')
            url, rev = super(Git, self).get_url_rev()
            url = url.replace('ssh://', '')
        else:
            url, rev = super(Git, self).get_url_rev()

        return url, rev


vcs.register(Git)