Did I find the right examples for you? yes no      Crawl my project      Python Jobs

All Samples(702)  |  Call(702)  |  Derive(0)  |  Import(0)
Runs the gsutil command.

Args:
  cmd: The command to run, as a list, e.g. ['cp', 'foo', 'bar']
  return_status: If True, the exit status code is returned.
  return_stdout: If True, the standard output of the command is returned.
  return_stderr: If True, the standard error of the command is returned.
  expected_status: The expected return code. If not specified, defaults to
                   0. If the return code is a different value, an exception
                   is raised.(more...)

src/g/s/gsutil-3.42/gslib/tests/test_acl.py   gsutil(Download)
  def test_set_invalid_acl_object(self):
    """Ensures that invalid XML content returns a MalformedACLError."""
    obj_uri = suri(self.CreateObject(contents='foo'))
    inpath = self.CreateTempFile(contents='badXml')
    stderr = self.RunGsUtil(self._set_acl_prefix + [inpath, obj_uri],
  def test_set_invalid_acl_bucket(self):
    """Ensures that invalid XML content returns a MalformedACLError."""
    bucket_uri = suri(self.CreateBucket())
    inpath = self.CreateTempFile(contents='badXml')
    stderr = self.RunGsUtil(self._set_acl_prefix + [inpath, bucket_uri],
  def test_set_valid_acl_object(self):
    """Ensures that valid canned and XML ACLs work with get/set."""
    obj_uri = suri(self.CreateObject(contents='foo'))
    acl_string = self.RunGsUtil(self._get_acl_prefix + [obj_uri],
                                return_stdout=True)
    inpath = self.CreateTempFile(contents=acl_string)
    self.RunGsUtil(self._set_acl_prefix + ['public-read', obj_uri])
    acl_string2 = self.RunGsUtil(self._get_acl_prefix + [obj_uri],

src/g/s/gsutil-HEAD/gslib/tests/test_acl.py   gsutil(Download)
  def test_set_invalid_acl_object(self):
    """Ensures that invalid XML content returns a MalformedACLError."""
    obj_uri = suri(self.CreateObject(contents='foo'))
    inpath = self.CreateTempFile(contents='badXml')
    stderr = self.RunGsUtil(self._set_acl_prefix + [inpath, obj_uri],
  def test_set_invalid_acl_bucket(self):
    """Ensures that invalid XML content returns a MalformedACLError."""
    bucket_uri = suri(self.CreateBucket())
    inpath = self.CreateTempFile(contents='badXml')
    stderr = self.RunGsUtil(self._set_acl_prefix + [inpath, bucket_uri],
  def test_set_valid_acl_object(self):
    """Ensures that valid canned and XML ACLs work with get/set."""
    obj_uri = suri(self.CreateObject(contents='foo'))
    acl_string = self.RunGsUtil(self._get_acl_prefix + [obj_uri],
                                return_stdout=True)
    inpath = self.CreateTempFile(contents=acl_string)
    self.RunGsUtil(self._set_acl_prefix + ['public-read', obj_uri])
    acl_string2 = self.RunGsUtil(self._get_acl_prefix + [obj_uri],

src/g/s/gsutil-3.42/gslib/tests/test_cp.py   gsutil(Download)
  def test_noclobber(self):
    key_uri = self.CreateObject(contents='foo')
    fpath = self.CreateTempFile(contents='bar')
    stderr = self.RunGsUtil(['cp', '-n', fpath, suri(key_uri)],
                            return_stderr=True)
    self.assertIn('Skipping existing item: %s' % suri(key_uri), stderr)
    self.assertEqual(key_uri.get_contents_as_string(), 'foo')
    stderr = self.RunGsUtil(['cp', '-n', suri(key_uri), fpath],
  def test_copy_in_cloud_noclobber(self):
    bucket1_uri = self.CreateBucket()
    bucket2_uri = self.CreateBucket()
    key_uri = self.CreateObject(bucket_uri=bucket1_uri, contents='foo')
    stderr = self.RunGsUtil(['cp', suri(key_uri), suri(bucket2_uri)],
                            return_stderr=True)
    self.assertEqual(stderr.count('Copying'), 1)
    stderr = self.RunGsUtil(['cp', '-n', suri(key_uri), suri(bucket2_uri)],
  def _run_streaming_test(self, provider):
    bucket_uri = self.CreateBucket(provider=provider)
    stderr = self.RunGsUtil(['cp', '-', '%s' % suri(bucket_uri, 'foo')],
                            stdin='bar', return_stderr=True)
    self.assertIn('Copying from <STDIN>', stderr)

src/g/s/gsutil-HEAD/gslib/tests/test_cp.py   gsutil(Download)
  def test_noclobber(self):
    key_uri = self.CreateObject(contents='foo')
    fpath = self.CreateTempFile(contents='bar')
    stderr = self.RunGsUtil(['cp', '-n', fpath, suri(key_uri)],
                            return_stderr=True)
    self.assertIn('Skipping existing item: %s' % suri(key_uri), stderr)
    self.assertEqual(key_uri.get_contents_as_string(), 'foo')
    stderr = self.RunGsUtil(['cp', '-n', suri(key_uri), fpath],
  def test_copy_in_cloud_noclobber(self):
    bucket1_uri = self.CreateBucket()
    bucket2_uri = self.CreateBucket()
    key_uri = self.CreateObject(bucket_uri=bucket1_uri, contents='foo')
    stderr = self.RunGsUtil(['cp', suri(key_uri), suri(bucket2_uri)],
                            return_stderr=True)
    self.assertEqual(stderr.count('Copying'), 1)
    stderr = self.RunGsUtil(['cp', '-n', suri(key_uri), suri(bucket2_uri)],
  def _run_streaming_test(self, provider):
    bucket_uri = self.CreateBucket(provider=provider)
    stderr = self.RunGsUtil(['cp', '-', '%s' % suri(bucket_uri, 'foo')],
                            stdin='bar', return_stderr=True)
    self.assertIn('Copying from <STDIN>', stderr)

src/g/s/gsutil-3.42/gslib/tests/test_ls.py   gsutil(Download)
  def test_blank_ls(self):
    self.RunGsUtil(['ls'])
 
  def test_empty_bucket(self):
    bucket_uri = self.CreateBucket()
    # Use @Retry as hedge against bucket listing eventual consistency.
    @Retry(AssertionError, tries=3, timeout_secs=1)
    def _Check1():
      stdout = self.RunGsUtil(['ls', suri(bucket_uri)], return_stdout=True)
    def _Check1():
      stdout = self.RunGsUtil(['ls', '-b', suri(bucket_uri)],
                              return_stdout=True)
      self.assertEqual('%s/\n' % suri(bucket_uri), stdout)
    _Check1()
    def _Check1():
      stdout = self.RunGsUtil(['ls', '-Lb', suri(bucket_uri)],
                              return_stdout=True)
      self.assertIn(suri(bucket_uri), stdout)
      self.assertNotIn('TOTAL:', stdout)
    def _Check1():
      stdout = self.RunGsUtil(['ls', '-lb', suri(bucket_uri)],
                              return_stdout=True)
      self.assertIn(suri(bucket_uri), stdout)
      self.assertNotIn('TOTAL:', stdout)

src/g/s/gsutil-3.42/gslib/tests/test_rm.py   gsutil(Download)
    def _Check1(stderr_lines):
      stderr = self.RunGsUtil(['-m', 'rm', '-a', suri(key_uri)],
                              return_stderr=True)
      stderr_lines.update(set(stderr.splitlines()))
      stderr = '\n'.join(stderr_lines)
    def _Check2():
      stdout = self.RunGsUtil(['ls', '-a', suri(bucket_uri)],
                              return_stdout=True)
      self.assertEqual(stdout, '')
    _Check2()
    key_uri.set_contents_from_string('baz')
    g2 = key_uri.generation
    stderr = self.RunGsUtil(['rm', suri(key_uri)], return_stderr=True)
    self.assertEqual(stderr.count('Removing gs://'), 1)
    self.assertIn('Removing %s...' % suri(key_uri), stderr)
    stderr = self.RunGsUtil(['-m', 'rm', '-a', suri(key_uri)],
    def _Check1():
      stdout = self.RunGsUtil(['ls', '-a', suri(bucket_uri)],
                              return_stdout=True)
      self.assertEqual(stdout, '')
    _Check1()

src/g/s/gsutil-HEAD/gslib/tests/test_ls.py   gsutil(Download)
  def test_blank_ls(self):
    self.RunGsUtil(['ls'])
 
  def test_empty_bucket(self):
    bucket_uri = self.CreateBucket()
    # Use @Retry as hedge against bucket listing eventual consistency.
    @Retry(AssertionError, tries=3, timeout_secs=1)
    def _Check1():
      stdout = self.RunGsUtil(['ls', suri(bucket_uri)], return_stdout=True)
    def _Check1():
      stdout = self.RunGsUtil(['ls', '-b', suri(bucket_uri)],
                              return_stdout=True)
      self.assertEqual('%s/\n' % suri(bucket_uri), stdout)
    _Check1()
    def _Check1():
      stdout = self.RunGsUtil(['ls', '-Lb', suri(bucket_uri)],
                              return_stdout=True)
      self.assertIn(suri(bucket_uri), stdout)
      self.assertNotIn('TOTAL:', stdout)
    def _Check1():
      stdout = self.RunGsUtil(['ls', '-lb', suri(bucket_uri)],
                              return_stdout=True)
      self.assertIn(suri(bucket_uri), stdout)
      self.assertNotIn('TOTAL:', stdout)

src/g/s/gsutil-3.42/gslib/tests/test_setmeta.py   gsutil(Download)
  def test_initial_metadata(self):
    objuri = suri(self.CreateObject(contents='foo'))
    inpath = self.CreateTempFile()
    ct = 'image/gif'
    self.RunGsUtil(['-h', 'x-goog-meta-xyz:abc', '-h', 'Content-Type:%s' % ct,
                    'cp', inpath, objuri])
    # Use @Retry as hedge against bucket listing eventual consistency.
    @Retry(AssertionError, tries=3, timeout_secs=1)
    def _Check1():
      stdout = self.RunGsUtil(['ls', '-L', objuri], return_stdout=True)
  def test_overwrite_existing(self):
    objuri = suri(self.CreateObject(contents='foo'))
    inpath = self.CreateTempFile()
    self.RunGsUtil(['-h', 'x-goog-meta-xyz:abc', '-h', 'Content-Type:image/gif',
                    'cp', inpath, objuri])
    self.RunGsUtil(['setmeta', '-n', '-h', 'Content-Type:text/html', '-h',
    def _Check1():
      stdout = self.RunGsUtil(['ls', '-L', objuri], return_stdout=True)
      self.assertRegexpMatches(stdout, 'Content-Type:\s+text/html')
      self.assertNotIn('xyz', stdout)
    _Check1()

src/g/s/gsutil-HEAD/gslib/tests/test_rm.py   gsutil(Download)
    def _Check1(stderr_lines):
      stderr = self.RunGsUtil(['-m', 'rm', '-a', suri(key_uri)],
                              return_stderr=True)
      stderr_lines.update(set(stderr.splitlines()))
      stderr = '\n'.join(stderr_lines)
    def _Check2():
      stdout = self.RunGsUtil(['ls', '-a', suri(bucket_uri)],
                              return_stdout=True)
      self.assertEqual(stdout, '')
    _Check2()
    key_uri.set_contents_from_string('baz')
    g2 = key_uri.generation
    stderr = self.RunGsUtil(['rm', suri(key_uri)], return_stderr=True)
    self.assertEqual(stderr.count('Removing gs://'), 1)
    self.assertIn('Removing %s...' % suri(key_uri), stderr)
    stderr = self.RunGsUtil(['-m', 'rm', '-a', suri(key_uri)],
    def _Check1():
      stdout = self.RunGsUtil(['ls', '-a', suri(bucket_uri)],
                              return_stdout=True)
      self.assertEqual(stdout, '')
    _Check1()

src/g/s/gsutil-HEAD/gslib/tests/test_setmeta.py   gsutil(Download)
  def test_initial_metadata(self):
    objuri = suri(self.CreateObject(contents='foo'))
    inpath = self.CreateTempFile()
    ct = 'image/gif'
    self.RunGsUtil(['-h', 'x-goog-meta-xyz:abc', '-h', 'Content-Type:%s' % ct,
                    'cp', inpath, objuri])
    # Use @Retry as hedge against bucket listing eventual consistency.
    @Retry(AssertionError, tries=3, timeout_secs=1)
    def _Check1():
      stdout = self.RunGsUtil(['ls', '-L', objuri], return_stdout=True)
  def test_overwrite_existing(self):
    objuri = suri(self.CreateObject(contents='foo'))
    inpath = self.CreateTempFile()
    self.RunGsUtil(['-h', 'x-goog-meta-xyz:abc', '-h', 'Content-Type:image/gif',
                    'cp', inpath, objuri])
    self.RunGsUtil(['setmeta', '-n', '-h', 'Content-Type:text/html', '-h',
    def _Check1():
      stdout = self.RunGsUtil(['ls', '-L', objuri], return_stdout=True)
      self.assertRegexpMatches(stdout, 'Content-Type:\s+text/html')
      self.assertNotIn('xyz', stdout)
    _Check1()

  1 | 2 | 3 | 4 | 5  Next