• Facebook
  • Twitter
  • Reddit
  • StumbleUpon
  • Digg
  • email

# Simple CE synchronisation utility with Python features.
 
import wincerapi
import win32api
import win32file
import getopt
import sys
import os
import string
import win32con
import fnmatch
 
class InvalidUsage(Exception): pass
 
def print_error(api_exc, msg):
    hr, fn, errmsg = api_exc
    print "%s - %s(%d)" % (msg, errmsg, hr)
 
def GetFileAttributes(file, local=1):
    if local: return win32api.GetFileAttributes(file)
    else: return wincerapi.CeGetFileAttributes(file)
 
def FindFiles(spec, local=1):
    if local: return win32api.FindFiles(spec)
    else: return wincerapi.CeFindFiles(spec)
 
def isdir(name, local=1):
    try:
        attr = GetFileAttributes(name, local)
        return attr & win32con.FILE_ATTRIBUTE_DIRECTORY
    except win32api.error:
        return 0
 
def CopyFileToCe(src_name, dest_name, progress = None):
    sh = win32file.CreateFile(src_name, win32con.GENERIC_READ, 0, None, win32con.OPEN_EXISTING, 0, None)
    bytes=0
    try:
        dh = wincerapi.CeCreateFile(dest_name, win32con.GENERIC_WRITE, 0, None, win32con.OPEN_ALWAYS, 0, None)
        try:
            while 1:
                hr, data = win32file.ReadFile(sh, 2048)
                if not data:
                    break
                wincerapi.CeWriteFile(dh, data)
                bytes = bytes + len(data)
                if progress is not None: progress(bytes)
        finally:
            pass
            dh.Close()
    finally:
        sh.Close()
    return bytes
 
def BuildFileList(spec, local, recurse, filter, filter_args, recursed_path = ""):
    files = []
    if isdir(spec, local):
        path = spec
        raw_spec = "*"
    else:
        path, raw_spec = os.path.split(spec)
    if recurse:
        # Need full scan, to get sub-direcetories.
        infos = FindFiles(os.path.join(path, "*"), local)
    else:
        infos = FindFiles(os.path.join(path, raw_spec), local)
    for info in infos:
        src_name = str(info[8])
        full_src_name = os.path.join(path, src_name)
        if local: # Can't do this for CE!
            full_src_name = win32api.GetFullPathName(full_src_name)
        if isdir(full_src_name, local) :
            if recurse and src_name not in ['.','..']:
                new_spec = os.path.join(full_src_name, raw_spec)
                files = files + BuildFileList(new_spec, local, 1, filter, filter_args, os.path.join(recursed_path, src_name))
        if fnmatch.fnmatch(src_name, raw_spec):
            rel_name = os.path.join(recursed_path, src_name)
            filter_data = filter( full_src_name, rel_name, info, local, filter_args )
            if filter_data is not None:
                files.append( (full_src_name, info, filter_data) )
    return files
 
def _copyfilter(full_name, rel_name, info, local, bMaintainDir):
    if isdir(full_name, local): return
    if bMaintainDir:
        return rel_name
    return os.path.split(rel_name)[1]
 
import pywin.dialogs.status, win32ui
class FileCopyProgressDialog(pywin.dialogs.status.CStatusProgressDialog):
    def CopyProgress(self, bytes):
        self.Set(bytes/1024)
 
def copy( args ):
    """copy src [src ...],  dest
    Copy files to/from the CE device
    """
    bRecurse = bVerbose = 0
    bMaintainDir = 1
    try:
        opts, args = getopt.getopt(args, "rv")
    except getopt.error, details:
        raise InvalidUsage(details)
    for o, v in opts:
        if o=="-r":
            bRecuse=1
        elif o=='-v':
            bVerbose=1
 
    if len(args)<2:
        raise InvalidUsage("Must specify a source and destination")
 
    src = args[:-1]
    dest = args[-1]
    # See if WCE: leading anywhere indicates a direction.
    if string.find(src[0], "WCE:")==0:
        bToDevice = 0
    elif string.find(dest, "WCE:")==0:
        bToDevice = 1
    else:
        # Assume copy to device.
        bToDevice = 1
 
    if not isdir(dest, not bToDevice):
        print "%s does not indicate a directory"
 
    files = [] # List of FQ (from_name, to_name)
    num_files = 0
    num_bytes = 0
    dialog = FileCopyProgressDialog("Copying files")
    dialog.CreateWindow(win32ui.GetMainFrame())
    if bToDevice:
        for spec in src:
            new = BuildFileList(spec, 1, bRecurse, _copyfilter, bMaintainDir)
            if not new:
                print "Warning: '%s' did not match any files" % (spec)
            files = files + new
 
        for full_src, src_info, dest_info in files:
            dest_name = os.path.join(dest, dest_info)
            size = src_info[5]
            print "Size=", size
            if bVerbose:
                print full_src, "->", dest_name,"- ",
            dialog.SetText(dest_name)
            dialog.Set(0, size/1024)
            bytes = CopyFileToCe(full_src, dest_name, dialog.CopyProgress)
            num_bytes = num_bytes + bytes
            if bVerbose:
                print bytes, "bytes"
            num_files = num_files + 1
    dialog.Close()
    print "%d files copied (%d bytes)" % (num_files, num_bytes)
 
def _dirfilter(*args):
    return args[1]
 
def dir(args):
    """dir directory_name ...
    Perform a directory listing on the remote device
    """
    bRecurse = 0
    try:
        opts, args = getopt.getopt(args, "r")
    except getopt.error, details:
        raise InvalidUsage(details)
    for o, v in opts:
        if o=="-r":
            bRecurse=1
    for arg in args:
        print "Directory of WCE:%s" % arg
        files = BuildFileList(arg, 0, bRecurse, _dirfilter, None)
        total_size=0
        for full_name, info, rel_name in files:
            date_str = info[3].Format("%d-%b-%Y %H:%M")
            attr_string = "     "
            if info[0] & win32con.FILE_ATTRIBUTE_DIRECTORY: attr_string = "<DIR>"
            print "%s  %s %10d %s" % (date_str, attr_string, info[5], rel_name)
            total_size = total_size + info[5]
        print " " * 14 + "%3d files, %10d bytes" % (len(files), total_size)
 
def run(args):
    """run program [args]
    Starts the specified program on the remote device.
    """
    prog_args = []
    for arg in args:
        if " " in arg:
            prog_args.append('"' + arg + '"')
        else:
            prog_args.append(arg)
    prog_args = string.join(prog_args, " ")
    wincerapi.CeCreateProcess(prog_args, "", None, None, 0, 0, None, "", None)
 
def delete(args):
    """delete file, ...
    Delete one or more remote files
    """
    for arg in args:
        try:
            wincerapi.CeDeleteFile(arg)
            print "Deleted: %s" % arg
        except win32api.error, details:
            print_error(details, "Error deleting '%s'" % arg)
 
def DumpCommands():
    print "%-10s - %s" % ("Command", "Description")
    print "%-10s - %s" % ("-------", "-----------")
    for name, item in globals().items():
        if type(item)==type(DumpCommands):
            doc = getattr(item, "__doc__", "")
            if doc:
                lines = string.split(doc, "\n")
                print "%-10s - %s" % (name, lines[0])
                for line in lines[1:]:
                    if line:
                        print " " * 8, line
 
def main():
    if len(sys.argv)<2:
        print "You must specify a command!"
        DumpCommands()
        return
    command = sys.argv[1]
    fn = globals().get(command)
    if fn is None:
        print "Unknown command:", command
        DumpCommands()
        return
 
    wincerapi.CeRapiInit()
    try:
        verinfo = wincerapi.CeGetVersionEx()
        print "Connected to device, CE version %d.%d %s" % (verinfo[0], verinfo[1], verinfo[4])
        try:
            fn(sys.argv[2:])
        except InvalidUsage, msg:
            print "Invalid syntax -", msg
            print fn.__doc__
 
    finally:
        try:
            wincerapi.CeRapiUninit()
        except win32api.error, details:
            print_error(details, "Error disconnecting")
 
if __name__=='__main__':
    main()