- 论坛徽章:
- 0
|
曾写过一个程序,类似的功能,支持多sheet比较,注释和颜色标记。也分享出来。- __author__ = 'luhongc'
- import xlrd
- import xlwt
- import xlutils.copy
- def compare(oldFile,newFile,outputFile,comments=True,comment_pattern="DPL Comment",table_split="^Table\s*\d+$",max_columns=30):
- """compare based on the directions, outputD for out put"""
- oldwb = xlrd.open_workbook(oldFile)
- newwb = xlrd.open_workbook(newFile, formatting_info=True)
- yield "Pairing the files....\n"
- paired, OldExtra, NewExtra = pairFiles(oldwb,newwb)
- if (len(OldExtra)>0):
- sheets = "\n".join (OldExtra)
- yield "Warning: Sheet \n {}\nare only in Old File!\n".format(sheets)
- if (len(NewExtra)>0):
- sheets = "\n".join (NewExtra)
- yield "Warning: Sheet \n {}\nare only in New File!\n".format(sheets)
- yield "\nComparing the files....\n"
- outwb = xlutils.copy.copy(newwb)
- outwb.set_colour_RGB(0x2A,100,200,100) # set light green color for output
- def get_sheet_by_name(book, name):
- """Get a sheet by name from xlwt.Workbook, a strangely missing method.
- Returns None if no sheet with the given name is present.
- """
- # Note, we have to use exceptions for flow control because the
- # xlwt API is broken and gives us no other choice.
- import itertools
- try:
- for idx in itertools.count():
- sheet = book.get_sheet(idx)
- if sheet.name == name:
- return sheet
- except IndexError:
- return None
- for k,v in paired.iteritems():
- if v is not None:
- sheetOld = oldwb.sheet_by_name(v)
- sheetNew = newwb.sheet_by_name(k)
- sheetOut = get_sheet_by_name(outwb,k)
- else:
- sheetOld = None
- sheetNew = newwb.sheet_by_name(k)
- sheetOut = get_sheet_by_name(outwb,k)
- for txt in GDOcomMain(k,sheetOut,sheetNew,sheetOld,comments,comment_pattern,table_split,max_columns):
- yield txt
- outwb.save(outputFile)
- def GDOcomMain(sheetName,sheetOut,sheetNew,sheetOld,comments,comment_pattern,table_split,max_columns):
- """ Main Comparison
- """
- yield "Comparing Sheet {}.".format(sheetName)
- if sheetOld is None: # For a New Only Sheet
- for txt in copyNew(sheetNew,sheetOut):
- yield txt
- else:
- has_comment,datapool = readOld(sheetOld,comment_pattern,table_split,max_columns) # Read Data Pool from Old Sheet.
- nTable = len(datapool)
- if nTable > 1:
- nTable -= 1
- yield sheetName + " has " +str(nTable) +" table(s)."
- # Comparing file can read new sheet
- for txt in typeCompare(sheetName,sheetOut,sheetNew,datapool,has_comment,table_split,comment_pattern,comments,max_columns):
- yield txt
- def hash_key(thisrow,comment_column):
- """ define hash key
- """
- return u"|".join(thisrow[:comment_column+1])
- def readOld(sheet,comment_pattern,table_split,max_columns):
- """ readOld sheet
- """
- import re
- p = re.compile(table_split,re.UNICODE) # split pattern
- datapool = [] # replied data pool
- bufs = [] # buffers to store rows
- has_comment = False
- buf = []
- thispool = {}
- commentTag = None
- newpart = False
- for row in xrange(sheet.nrows):
- thisrow = [u'' for i in range(max_columns)] # fill row with "" according to max_columns
- for col in xrange(sheet.ncols):
- cell = sheet.cell(row,col)
- v = unicode(cell.value).strip()
- if p.match(v):
- newpart = True # split flag
- if v.upper() == comment_pattern.upper(): # detect comment_pattern
- if commentTag is not None:
- assert col == commentTag, "Comments should in the same column in old Sheet {}".format(sheet.name)
- commentTag = col
- has_comment = True
- thisrow[col] = v
- buf.append(thisrow)
- if newpart:
- datapool.append((thispool,commentTag))
- bufs.append(buf)
- thispool = {}
- commentTag = None
- buf = []
- newpart = False
- datapool.append((thispool,commentTag)) # The last part
- bufs.append(buf)
- for (pool,Tag),buf in zip(datapool,bufs): # fill data pool
- for thisrow in buf:
- if Tag is None: # if there is no comments
- key = hash_key(thisrow,len(thisrow)-1)
- pool[key] = ""
- else:
- key = hash_key(thisrow,Tag-1) # if there is comments
- pool[key] = thisrow[Tag]
- return has_comment,datapool
- def copyNew(sheetNew,sheetOut,color='yellow'):
- """Annotate a new sheet """
- for row in range(sheetNew.nrows):
- for col in range(sheetNew.ncols):
- v = sheetNew.cell(row,col).value
- style = xlwt.easyxf('pattern: pattern solid, fore_colour {}'.format(color))
- if v == u"": # only change the null cells
- sheetOut.write(row,col,v,style)
- yield "Sheet {} is new one and annotated in YELLOW.\n".format(sheetOut.name)
- def typeCompare(sheetName,sheetOut,sheetNew,datapool,has_comment,table_split,comment_pattern,comments,max_columns,color='0x2A'):
- """ Comparing sheets and output
- """
- import re
- p = re.compile(table_split,re.UNICODE) # table split pattern
- recordpool = []
- record = []
- newpart = False
- partid = 0
- commentTag = datapool[partid][1]
- # Check the tables in new sheet
- for row in xrange(sheetNew.nrows):
- for col in xrange(sheetNew.ncols):
- cell = sheetNew.cell(row,col)
- v = unicode(cell.value).strip()
- if p.match(v):
- newpart = True
- if v.upper() == comment_pattern.upper():
- assert col == commentTag, "Comments should in the same column in new Sheet {}".format(sheetNew.name)
- record.append(row)
- if newpart:
- recordpool.append((record,commentTag))
- record = []
- partid += 1
- try:
- commentTag = datapool[partid][1]
- except:
- raise ValueError("Sheet {} has different number of tables".format(sheetName))
- newpart = False
- recordpool.append((record,commentTag))
- assert len(recordpool) == len(datapool),"Sheet {} has different number of tables".format(sheetName)
- for partid,(record,Tag) in enumerate(recordpool):
- for row in record:
- thisrow = [u'' for i in range(max_columns)] # fill the this row with null according to max_columns
- for col in xrange(sheetNew.ncols):
- cell = sheetNew.cell(row,col)
- v = unicode(cell.value).strip()
- thisrow[col] = v
- if Tag is None: # define key by commentTag
- key = hash_key(thisrow,len(thisrow)-1)
- else:
- key = hash_key(thisrow,Tag-1)
- if key in datapool[partid][0]:
- # old records
- oldTag = datapool[partid][1]
- if oldTag is not None and comments:
- sheetOut.write(row,oldTag,datapool[partid][0][key])
- else:
- # new records
- for col in xrange(sheetNew.ncols):
- cell = sheetNew.cell(row,col)
- v = cell.value
- is_date = False
- try: # detect if the format is possible date field
- date = float(v)
- if date > 10000.0 and date < 99999.0:
- is_date = True
- except:
- pass
- if not is_date:
- style = xlwt.easyxf('pattern: pattern solid, fore_colour {}; align: horiz center; border:top thin,bottom thin;'.format(color))
- sheetOut.write(row,col,v,style)
- yield "Sheet {} is OK. New items are annotated in GREEN\n".format(sheetOut.name)
- def pairFiles(oldwb,newwb):
- """paired the sheets based on format"""
- newSheets = { k.strip():k for k in newwb.sheet_names()} # paired by stripped names
- oldSheets = { k.strip():k for k in oldwb.sheet_names()}
- oldExtra = set(oldSheets.keys()) - set(newSheets.keys())
- newExtra = set(newSheets.keys()) - set(oldSheets.keys())
- paired = {}
- for s,v in newSheets.iteritems():
- if s in oldSheets:
- paired[v] = oldSheets[s]
- else:
- paired[v] = None
- return paired, oldExtra, newExtra
- if __name__ == "__main__":
- import sys
- oldD,newD,outputD = sys.argv[1:]
- for txt in compare(oldD,newD,outputD):
- print txt
复制代码 |
|