Source code for transitiveDigraphs

#!/usr/bin/env python3
"""
Digraph3 module for working with transitive digraphs. 
Copyright (C) 2006-2023  Raymond Bisdorff

    This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 3 of the License, or (at your option) any later version.

    This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more details.

    You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.

"""
#######################

__version__ = "$Revision: Python 3.10"

from digraphsTools import *
from digraphs import *
from outrankingDigraphs import *
from transitiveDigraphs import *
#from multiprocessing import Process, active_children, cpu_count
import multiprocessing as mp
mpctx = mp.get_context('spawn')
Process = mpctx.Process
active_children = mpctx.active_children
cpu_count = mpctx.cpu_count


[docs] class TransitiveDigraph(Digraph): """ Abstract class for specialized methods addressing transitive digraphs. """ def __init__(self): print('abstract root class')
[docs] def showWeakOrder(self,rankingByChoosing=None): """ A dummy for the showTransitiveDigraph() method. """ self.showTransitiveDigraph()
[docs] def showTransitiveDigraph(self,rankingByChoosing=None,WithCoverCredibility=False): """ A show method for self.rankinByChoosing result. """ if rankingByChoosing is None: try: rankingByChoosing = self.rankingByChoosing['result'] except: #print('Error: You must first run self.computeRankingByChoosing(CoDual=False(default)|True) !') self.computeRankingByChoosing() rankingByChoosing = self.rankingByChoosing['result'] ## else: ## try: ## rankingByChoosing = self.rankingByLastChoosing['result'] ## except: ## #print('Error: You must first run self.computeRankingByChoosing(CoDual=False(default)|True) !') ## self.computeRankingByLastChoosing() ## rankingByChoosing = self.rankingByLastChoosing['result'] else: rankingByChoosing = rankingByChoosing['result'] print('Ranking by Choosing and Rejecting') space = '' n = len(rankingByChoosing) for i in range(n): if i+1 == 1: nstr='st' elif i+1 == 2: nstr='nd' elif i+1 == 3: nstr='rd' else: nstr='th' ibch = set(rankingByChoosing[i][0][1]) iwch = set(rankingByChoosing[i][1][1]) iach = iwch & ibch #print 'ibch, iwch, iach', i, ibch,iwch,iach ch = list(ibch) ch.sort() if WithCoverCredibility: print(' %s%s%s ranked %s (%.2f)' % (space,i+1,nstr,ch,rankingByChoosing[i][0][0])) else: print(' %s%s%s ranked %s' % (space,i+1,nstr,ch) ) if len(iach) > 0 and i < n-1: print(' %s Ambiguous Choice %s' % (space,list(iach))) space += ' ' space += ' ' for i in range(n): if n-i == 1: nstr='st' elif n-i == 2: nstr='nd' elif n-i == 3: nstr='rd' else: nstr='th' space = space[:-2] ibch = set(rankingByChoosing[n-i-1][0][1]) iwch = set(rankingByChoosing[n-i-1][1][1]) iach = iwch & ibch #print 'ibch, iwch, iach', i, ibch,iwch,iach ch = list(iwch) ch.sort() if len(iach) > 0 and i > 0: space = space[:-2] print(' %s Ambiguous Choice %s' % (space,list(iach))) if WithCoverCredibility: print(' %s%s%s last ranked %s (%.2f)' % (space,n-i,nstr,ch,rankingByChoosing[n-i-1][1][0])) else: print(' %s%s%s last ranked %s)' % (space,n-i,nstr,ch) )
[docs] def showRankingByChoosing(self,actionsList=None,rankingByChoosing=None,WithCoverCredibility=False): """ Dummy name for showTransitiveDigraph() method """ self.showTransitiveDigraph(rankingByChoosing=rankingByChoosing, WithCoverCredibility=WithCoverCredibility)
[docs] def showOrderedRelationTable(self,direction="decreasing",originalRelation=False): """ Showing the relation table in decreasing (default) or increasing order. """ actionsList = [] if direction == "decreasing": print('Decreasing Weak Ordering') self.showRankingByBestChoosing() try: ordering = self.rankingByBestChoosing except: ordering = self.computeRankingByBestChoosing(Debug=False) elif direction == "increasing": print('Increasing Weak Ordering') self.showRankingByLastChoosing() try: ordering = self.rankingByLastChoosing except: ordering = self.computeRankingByLastChoosing() else: print('Direction error !: %s is not a correct instruction (decreasing=default or increasing)' % direction) for eq in ordering['result']: #print(eq[1]) eq = eq[1] eq.sort() for x in eq: actionsList.append(x) if len(actionsList) != len(self.actions): print('Error !: missing action(s) %s in ordered table.') if originalRelation: showRelation = self.originalRelation else: showRelation = self.relation Digraph.showRelationTable(self,actionsSubset=actionsList, relation=showRelation, Sorted=False, ReflexiveTerms=False)
[docs] def exportDigraphGraphViz(self,fileName=None, bestChoice=set(),worstChoice=set(),Comments=True,graphType='png',graphSize='7,7'): """ export GraphViz dot file for digraph drawing filtering. """ Digraph.exportGraphViz(self, fileName=fileName, bestChoice=bestChoice, worstChoice=worstChoice, Comments=Comments, graphType=graphType, graphSize=graphSize)
[docs] def exportTopologicalGraphViz(self,fileName=None, relation=None,direction='best', Comments=True,graphType='png', graphSize='7,7', fontSize=10,Debug=True): """ export GraphViz dot file for Hasse diagram drawing filtering. """ import os from copy import copy as deepcopy def _safeName(t0): t = t0.split(sep="-") t1 = t[0] n = len(t) if n > 1: for i in range(1,n): t1 += '%s%s' % ('_',t[i]) return t1 # working on a deepcopy of self digraph = deepcopy(self) digraph.computeTopologicalRanking() if not digraph.Acyclic: print('Error: not a transitive digraph !!!') return topologicalRanking = digraph.topologicalRanking if Debug: print(topologicalRanking) ## if direction == 'best': ## try: ## rankingByChoosing = digraph.rankingByBestChoosing['result'] ## except: ## digraph.computeRankingByBestChoosing() ## rankingByChoosing = digraph.rankingByBestChoosing['result'] ## else: ## try: ## rankingByChoosing = digraph.rankingByLastChoosing['result'] ## except: ## digraph.computeRankingByLastChoosing() ## rankingByChoosing = digraph.rankingByLastChoosing['result'] ## if Debug: ## print(rankingByChoosing) if Comments: print('*---- exporting a dot file for GraphViz tools ---------*') actionKeys = [x for x in digraph.actions] n = len(actionKeys) if relation is None: relation = deepcopy(digraph.relation) Med = digraph.valuationdomain['med'] i = 0 if fileName is None: name = digraph.name else: name = fileName dotName = name+'.dot' if Comments: print('Exporting to '+dotName) ## if bestChoice != set(): ## rankBestString = '{rank=max; ' ## if worstChoice != set(): ## rankWorstString = '{rank=min; ' fo = open(dotName,'w') fo.write('digraph G {\n') fo.write('graph [ bgcolor = cornsilk, ordering = out, fontname = "Helvetica-Oblique",\n fontsize = 12,\n label = "') fo.write('\\nDigraph3 (graphviz)\\n R. Bisdorff, 2020", size="') fo.write(graphSize),fo.write('",fontsize=%d];\n' % fontSize) # nodes for x in actionKeys: try: nodeName = digraph.actions[x]['shortName'] except: nodeName = str(x) node = '%s [shape = "circle", label = "%s", fontsize=%d];\n'\ % (str(_safeName(x)),_safeName(nodeName),fontSize) fo.write(node) # same ranks for Hasses equivalence classes k = len(topologicalRanking) i = 1 for ich in topologicalRanking: sameRank = '{ rank = %d ; ' % i for x in topologicalRanking[ich]: sameRank += str(_safeName(x))+'; ' sameRank += '}\n' print(i,sameRank) fo.write(sameRank) i += 1 ## k = len(rankingByChoosing) ## for i in range(k): ## sameRank = '{ rank = same; ' ## ich = rankingByChoosing[i][1] ## for x in ich: ## sameRank += str(_safeName(x))+'; ' ## sameRank += '}\n' ## print(i,sameRank) ## fo.write(sameRank) # save original relation #originalRelation = deepcopy(relation) #digraph.closeTransitive(Reverse=False) relation = digraph.closeTransitive(Reverse=True,InSite=False) k = len(topologicalRanking) for i in range(1,k+1): ich = topologicalRanking[i] for x in ich: for j in range(i+1,k+1): jch = topologicalRanking[j] for y in jch: if relation[x][y] > digraph.valuationdomain['med']: arcColor = 'black' edge = '%s-> %s [style="setlinewidth(%d)",color=%s] ;\n' %\ (_safeName(x),_safeName(y),1,arcColor) fo.write(edge) fo.write('}\n \n') fo.close() commandString = 'dot -Grankdir=TB -T'+graphType+' ' + \ dotName+' -o '+name+'.'+graphType #commandString = 'dot -T'+graphType+' ' +dotName+' -o '+name+'.'+graphType if Comments: print(commandString) try: os.system(commandString) except: if Comments: print('graphViz tools not avalaible! Please check installation.')
[docs] def exportGraphViz(self,fileName=None,direction='best', WithBestPathDecoration=False, WithRatingDecoration=False, Comments=True,graphType='png', graphSize='7,7',bgcolor='cornsilk', fontSize=10,Debug=False): """ export GraphViz dot file for Hasse diagram drawing filtering. """ import os from copy import copy as deepcopy def _safeName(t0): t = t0.split(sep="-") t1 = t[0] n = len(t) if n > 1: for i in range(1,n): t1 += '%s%s' % ('_',t[i]) return t1 # working on a deepcopy of self digraph = deepcopy(self) if direction == 'best': try: rankingByChoosing = digraph.rankingByBestChoosing['result'] except: digraph.computeRankingByBestChoosing() rankingByChoosing = digraph.rankingByBestChoosing['result'] else: try: rankingByChoosing = digraph.rankingByLastChoosing['result'] except: digraph.computeRankingByLastChoosing() rankingByChoosing = digraph.rankingByLastChoosing['result'] if Debug: print(rankingByChoosing) if Comments: print('*---- exporting a dot file for GraphViz tools ---------*') actionKeys = [x for x in digraph.actions] n = len(actionKeys) #if relation is None: # relation = deepcopy(digraph.relation) Med = digraph.valuationdomain['med'] i = 0 if fileName is None: name = digraph.name else: name = fileName dotName = name+'.dot' if Comments: print('Exporting to '+dotName) ## if bestChoice != set(): ## rankBestString = '{rank=max; ' ## if worstChoice != set(): ## rankWorstString = '{rank=min; ' fo = open(dotName,'w') fo.write('digraph G {\n') if bgcolor is None: fo.write('graph [ ordering = out, fontname = "Helvetica-Oblique",\n fontsize = 12,\n label = "') else: fo.write('graph [ bgcolor = %s, ordering = out, fontname = "Helvetica-Oblique",\n fontsize = 12,\n label = "' % bgcolor) fo.write('\\nDigraph3 (graphviz)\\n R. Bisdorff, 2020", size="') fo.write(graphSize),fo.write('",fontsize=%d];\n' % fontSize) # nodes for x in actionKeys: if WithRatingDecoration: if x in digraph.profiles: cat = digraph.profiles[x]['category'] if digraph.LowerClosed: nodeName = digraph.categories[cat]['lowLimit'] + ' -' else: nodeName = '- ' +digraph.categories[cat]['highLimit'] node = '%s [shape = "box", fillcolor=lightcoral, style=filled, label = "%s", fontsize=%d];\n'\ % (str(x),nodeName,fontSize) else: try: nodeName = digraph.actions[x]['shortName'] except: nodeName = str(x) node = '%s [shape = "circle", label = "%s", fontsize=%d];\n'\ % (str(_safeName(x)),_safeName(nodeName),fontSize) elif WithBestPathDecoration: try: nodeName = digraph.actions[x]['shortName'] except: nodeName = str(x) if x in digraph.optimalPath: node = '%s [shape = "circle", fillcolor=lightcoral, style=filled, label = "%s", fontsize=%d];\n'\ % (str(_safeName(x)),_safeName(nodeName),fontSize) else: node = '%s [shape = "circle", label = "%s", fontsize=%d];\n'\ % (str(_safeName(x)),_safeName(nodeName),fontSize) else: try: nodeName = digraph.actions[x]['shortName'] except: nodeName = str(x) node = '%s [shape = "circle", label = "%s", fontsize=%d];\n'\ % (str(_safeName(x)),_safeName(nodeName),fontSize) fo.write(node) # same ranks for Hasses equivalence classes k = len(rankingByChoosing) for i in range(k): sameRank = '{ rank = %d; ' % i ich = rankingByChoosing[i][1] for x in ich: sameRank += str(_safeName(x))+'; ' sameRank += '}\n' print(i,sameRank) fo.write(sameRank) # open transitive links and write the positive arcs relation = digraph.closeTransitive(Reverse=True,InSite=False) for i in range(k-1): ich = rankingByChoosing[i][1] for x in ich: for j in range(i+1,k): jch = rankingByChoosing[j][1] for y in jch: #edge = 'n'+str(i+1)+'-> n'+str(i+2)+' [dir=forward,style="setlinewidth(1)",color=black, arrowhead=normal] ;\n' if WithBestPathDecoration: if x in digraph.optimalPath and y in digraph.optimalPath: arcColor = 'blue' lineWidth = 2 if relation[x][y] > digraph.valuationdomain['med']: #arcColor = 'black' edge = '%s-> %s [label="%.0f",style="setlinewidth(%d)",color=%s] ;\n' %\ (_safeName(x),_safeName(y),digraph.costs[x][y],lineWidth,arcColor) fo.write(edge) elif relation[y][x] > digraph.valuationdomain['med']: #arcColor = 'black' edge = '%s-> %s [label="%.0f",style="setlinewidth(%d)",color=%s] ;\n' %\ (_safeName(y),_safeName(x),digraph.costs[y][x],lineWidth,arcColor) fo.write(edge) else: arcColor= 'grey' lineWidth = 1 if relation[x][y] > digraph.valuationdomain['med']: #arcColor = 'black' edge = '%s-> %s [style="setlinewidth(%d)",color=%s] ;\n' %\ (_safeName(x),_safeName(y),lineWidth,arcColor) fo.write(edge) elif relation[y][x] > digraph.valuationdomain['med']: #arcColor = 'black' edge = '%s-> %s [style="setlinewidth(%d)",color=%s] ;\n' %\ (_safeName(y),_safeName(x),lineWidth,arcColor) fo.write(edge) else: if relation[x][y] > digraph.valuationdomain['med']: arcColor = 'black' edge = '%s-> %s [style="setlinewidth(%d)",color=%s] ;\n' %\ (_safeName(x),_safeName(y),1,arcColor) fo.write(edge) elif relation[y][x] > digraph.valuationdomain['med']: arcColor = 'black' edge = '%s-> %s [style="setlinewidth(%d)",color=%s] ;\n' %\ (_safeName(y),_safeName(x),1,arcColor) fo.write(edge) fo.write('}\n \n') fo.close() # restore original relation #relation = deepcopy(originalRelation) commandString = 'dot -Grankdir=TB -T'+graphType+' ' +dotName+\ ' -o '+name+'.'+graphType #commandString = 'dot -T'+graphType+' ' +dotName+' -o '+name+'.'+graphType if Comments: print(commandString) try: os.system(commandString) except: if Comments: print('graphViz tools not avalaible! Please check installation.')
[docs] class RankingsFusionDigraph(TransitiveDigraph): """ Specialization of the abstract TransitiveDigraph class for digraphs resulting from the epistemic disjunctive or conjunctive fusion (omax|omin operator) of a list of rankings. *Parameters*: * other = either a Digraph or a PerformanceTableau object; * fusionOperator = 'o-max' (default) | 'o-min' : Disjunctive, resp. conjuntive epistemic fusion. Example application: >>> from transitiveDigraphs import RankingsFusionDigraph >>> from sparseOutrankingDigraphs import PreRankedOutrankingDigraph >>> t = RandomPerformanceTableau(seed=1) >>> pr = PreRankedOutrankingDigraph(t,10,quantilesOrderingStrategy='average') >>> r1 = pr.boostedRanking >>> pro = PreRankedOutrankingDigraph(t,10,quantilesOrderingStrategy='optimistic') >>> r2 = pro.boostedRanking >>> prp = PreRankedOutrankingDigraph(t,10,quantilesOrderingStrategy='pessimistic') >>> r3 = prp.boostedRanking >>> wqr = RankingsFusionDigraph(pr,[r1,r2,r3]) >>> wqr.boostedRanking ['a07', 'a10', 'a06', 'a11', 'a02', 'a08', 'a04', 'a03', 'a01', 'a09', 'a12', 'a13', 'a05'] """ def __init__(self,other,rankings,fusionOperator='o-max',Debug=False): from digraphsTools import ranking2preorder, omax, omin from copy import deepcopy from decimal import Decimal if Debug: print(rankings) if len(rankings) < 1: print('Error: several rankings have to be provided!') return self.__dict__ = deepcopy(other.__dict__) self.order = len(self.actions) self.name = other.name + '_wk' self.valuationdomain = {} self.valuationdomain['min'] = Decimal('-1') self.valuationdomain['max'] = Decimal('1') self.valuationdomain['med'] = Decimal('0') Med = self.valuationdomain['med'] relations = [] for rel in rankings: if Debug: print(rel) relations.append(self.computePreorderRelation(ranking2preorder(rel))) if Debug: print(relations) relation = {} for x in self.actions: relation[x] = {} for y in self.actions: L = [relations[i][x][y] for i in range(len(relations))] if fusionOperator == 'o-max': relation[x][y] = omax(Med,L) elif fusionOperator == 'o-min': relation[x][y] = omin(Med,L) else: print('Error: incorrect fusion operator %s' % fusionOperator) return if Debug: print(x,y,L,relation[x][y]) if Debug: print(relation) self.relation = relation self.gamma = self.gammaSets() self.notGamma = self.notGammaSets()
[docs] class RankingsFusion(RankingsFusionDigraph): """ Obsolete dummy for the RankingsFusionDigraph class """
[docs] class KemenyOrdersFusion(TransitiveDigraph): """ Specialization of the abstract TransitiveDigraph class for transitive digraphs resulting from the epistemic disjunctive (default) or conjunctive fusion of all potential Kemeny linear orderings. *Parameter*: * fusionOperator = 'o-max' (default) | 'o-min' : Disjunctive, resp. conjuntive epistemic fusion. """ def __init__(self,other,orderLimit=7,fusionOperator='o-max', Debug=False): if other.order > orderLimit: print('Digraph order %d to high. The default limit (7) may be changed with the oderLimit argument.') return from digraphsTools import ranking2preorder, omax, omin from copy import deepcopy from decimal import Decimal self.__dict__ = deepcopy(other.__dict__) self.name = other.name + '_wk' self.valuationdomain['min'] = Decimal('-1') self.valuationdomain['max'] = Decimal('1') self.valuationdomain['med'] = Decimal('0') Med = self.valuationdomain['med'] #relation = copy(other.relation) if self.computeKemenyRanking(orderLimit=orderLimit,Debug=False) is None: # [0] = ordered actions list, [1] = maximal Kemeny index print('Intantiation error: unable to compute the Kemeny Order !!!') print('Digraph order %d is required to be lower than 8!' % n) return kemenyRankings = self.maximalRankings if Debug: print(kemenyRankings) relations = [] for rel in kemenyRankings: #print(rel) relations.append(self.computePreorderRelation(ranking2preorder(rel))) if Debug: print(relations) relation = {} for x in self.actions: relation[x] = {} for y in self.actions: L = [relations[i][x][y] for i in range(len(relations))] if fusionOperator == 'o-max': relation[x][y] = omax(Med,L) elif fusionOperator == 'o-min': relation[x][y] = omin(Med,L) else: print('Error: incorrect fusion operator %s' % fusionOperator) return if Debug: print(x,y,L,relation[x][y]) if Debug: print(relation) self.relation = relation #print(self.relation) self.gamma = self.gammaSets() self.notGamma = self.notGammaSets()
# myThread for KohlerArrawRaynaudFusion ## if Threading: ## from multiprocessing import Process, Lock, active_children, cpu_count class _myKARThread(Process): def __init__(self, threadID, name, direction, tempDirName, Debug): Process.__init__(self) self.threadID = threadID self.name = name self.direction = direction self.workingDirectory = tempDirName self.Debug = Debug def run(self): from linearOrders import KohlerOrder from pickle import dumps, loads from os import chdir chdir(self.workingDirectory) from sys import setrecursionlimit setrecursionlimit(2**20) if self.Debug: print("Starting working in %s on %s" % (self.workingDirectory, self.name)) #threadLock.acquire() fi = open('dumpDigraph.py','rb') digraph = loads(fi.read()) fi.close() if self.direction == 'best': fo = open('ko.py','wb') ko = KohlerOrder(digraph) fo.write(dumps(ko.relation,-1)) elif self.direction == 'worst': fo = open('ar.py','wb') ar = KohlerOrder((~(-digraph))) fo.write(dumps(ar.relation,-1)) fo.close() #threadLock.release()
[docs] class KohlerArrowRaynaudFusion(TransitiveDigraph): """ Specialization of the abstract TransitiveDigraph class for ranking-by-choosing orderings resulting from the epistemic disjunctive (o-max) or conjunctive (o-min) fusion of a Kohler linear best ordering and an Arrow-Raynaud linear worst ordering. """ def __init__(self,outrankingDigraph, fusionOperator='o-max', Threading=True, Debug=False): from digraphsTools import ranking2preorder, omax, omin from copy import copy as deepcopy from pickle import dumps, loads, load from linearOrders import KohlerOrder self.Debug=Debug self.Threading = Threading digraph=deepcopy(outrankingDigraph) digraph.recodeValuation(-1.0,1.0) self.name = digraph.name #self.__class__ = digraph.__class__ self.actions = deepcopy(digraph.actions) self.order = len(self.actions) self.valuationdomain = deepcopy(digraph.valuationdomain) self.originalRelation = digraph.relation if Threading and cpu_count()>2: print('Threading ...') from tempfile import TemporaryDirectory with TemporaryDirectory() as tempDirName: digraphFileName = tempDirName +'/dumpDigraph.py' if Debug: print('temDirName, digraphFileName', tempDirName,digraphFileName) fo = open(digraphFileName,'wb') pd = dumps(digraph,-1) fo.write(pd) fo.close() threadBest = _myKARThread(1,"ComputeBest","best",tempDirName,Debug) threadWorst = _myKARThread(2,"ComputeWorst","worst",tempDirName,Debug) threadBest.start() threadWorst.start() while active_children() != []: pass print('Exiting computing threads') koFileName = tempDirName +'/ko.py' fi = open(koFileName,'rb') KohlerRelation = loads(fi.read()) fi.close() arFileName = tempDirName + '/ar.py' fi = open(arFileName,'rb') ArrowRaynaudRelation = loads(fi.read()) fi.close() else: ko = KohlerOrder(digraph) ar = KohlerOrder((~(-digraph))) KohlerRelation = deepcopy(ko.relation) ArrowRaynaudRelation = deepcopy(ar.relation) if Debug: print('Kohler = ', KohlerRelation) print('ArrowRaynaud = ', ArrowRaynaudRelation) relation = {} Med = self.valuationdomain['med'] for x in self.actions: relation[x] = {} for y in self.actions: L = [KohlerRelation[x][y],ArrowRaynaudRelation[x][y]] if fusionOperator == "o-max": relation[x][y] = omax(Med,L) elif fusionOperator == "o-min": relation[x][y] = omin(Med,L) else: print('Error: invalid epistemic fusion operator %s' % fusionOperator) return if Debug: print('!',x,y,KohlerRelation[x][y], ArrowRaynaudRelation[x][y],relFusion[x][y]) self.relation=deepcopy(relation) self.gamma = self.gammaSets() self.notGamma = self.notGammaSets()
#--------------------- # my Thread for the RankingByChoosing class class _myRBCThread(Process): def __init__(self, threadID, name, direction, tempDirName, CoDual, Debug): Process.__init__(self) self.threadID = threadID self.name = name self.direction = direction self.workingDirectory = tempDirName self.CoDual = CoDual self.Debug = Debug def run(self): from pickle import dumps, loads from os import chdir chdir(self.workingDirectory) from sys import setrecursionlimit setrecursionlimit(2**20) Debug = self.Debug CoDual = self.CoDual if Debug: print("Starting working in %s on %s" % (self.workingDirectory, self.name)) #threadLock.acquire() fi = open('dumpDigraph.py','rb') digraph = loads(fi.read()) fi.close() if self.direction == 'best': fo = open('rbbc.py','wb') rbbc = digraph.computeRankingByBestChoosing(CoDual=CoDual,Debug=Debug) fo.write(dumps(rbbc,-1)) elif self.direction == 'worst': fo = open('rbwc.py','wb') rbwc = digraph.computeRankingByLastChoosing(CoDual=CoDual,Debug=Debug) fo.write(dumps(rbwc,-1)) fo.close()
[docs] class RankingByChoosingDigraph(TransitiveDigraph): """ Specialization of the abstract TransitiveDigraph class for ranking-by-Rubis-choosing orderings. Example python3 session: >>> from outrankingDigraphs import * >>> t = RandomCBPerformanceTableau(numberOfActions=7, ... numberOfCriteria=5, ... weightDistribution='equiobjectives') >>> g = BipolarOutrankingDigraph(t,Normalized=True) >>> g.showRelationTable() * ---- Relation Table ----- r | 'a1' 'a2' 'a3' 'a4' 'a5' 'a6' 'a7' -----|------------------------------------------------------------ 'a1' | +1.00 +1.00 +0.67 +0.33 +0.33 -0.17 +0.00 'a2' | -1.00 +1.00 +0.00 +0.00 +1.00 +0.00 +0.50 'a3' | -0.33 +0.00 +1.00 +0.17 +0.17 -0.17 +0.00 'a4' | +0.00 +0.00 +0.50 +1.00 +0.17 -0.33 -0.50 'a5' | -0.33 -1.00 +0.00 -0.17 +1.00 +0.17 +0.00 'a6' | +0.17 +0.00 +0.42 +0.33 -0.17 +1.00 +0.00 'a7' | +0.00 +0.42 +0.00 +0.50 +0.00 +0.00 +1.00 Valuation domain: [-1.000; 1.000] >>> from transitiveDigraphs import RankingByChoosingDigraph >>> rbc = RankingByChoosingDigraph(g) Threading ... Exiting computing threads >>> rbc.showTransitiveDigraph() Ranking by Choosing and Rejecting 1st ranked ['a3', 'a5', 'a6'] 2nd ranked ['a4'] 2nd last ranked ['a7']) 1st last ranked ['a1', 'a2']) >>> rbc.showOrderedRelationTable(direction="decreasing") Decreasing Weak Ordering Ranking by recursively best-choosing 1st Best Choice ['a3', 'a6'] (0.07) 2nd Best Choice ['a5'] (0.08) 3rd Best Choice ['a1', 'a4'] (0.17) 4th Best Choice ['a7'] (-0.67) 5th Best Choice ['a2'] (1.00) * ---- Relation Table ----- S | 'a3' 'a6' 'a5' 'a1' 'a4' 'a7' 'a2' ------|------------------------------------------- 'a3' | - 0.00 0.00 0.67 0.33 0.00 1.00 'a6' | 0.00 - 0.00 0.00 0.00 0.00 0.17 'a5' | 0.00 0.00 - 0.33 0.25 0.17 0.17 'a1' | -0.67 0.00 -0.33 - 0.00 0.00 0.00 'a4' | 0.00 0.00 0.00 0.00 - 0.33 0.33 'a7' | 0.00 0.00 0.00 0.00 0.00 - 0.67 'a2' | -1.00 -0.17 -0.17 0.00 -0.33 -0.67 - Valuation domain: [-1.00;1.00] """ def __repr__(self): """ Presentation method for RankingByChoosing Digraph instance. """ String = '*----- Object instance description -----------*\n' String += 'Instance class : %s\n' % self.__class__.__name__ String += 'Instance name : %s\n' % self.name String += 'Actions : %d\n' % len(self.actions) String += 'Valuation domain : [%.2f-%.2f]\n' %\ (self.valuationdomain['min'],self.valuationdomain['max']) String += 'Size : %d\n' % self.computeSize() String += 'Attributes: %s\n' % list(self.__dict__.keys()) String += 'Determinateness (%%) : %.1f\n' %\ self.computeDeterminateness(InPercents=True) String += '*------ Constructor run times (in sec.) ------*\n' String += 'Threads : %d\n' % self.nbrThreads String += 'Total time : %.5f\n' % self.runTimes['totalTime'] String += 'Data input : %.5f\n' % self.runTimes['dataInput'] String += 'Ranking-by-choosing : %.5f\n' % self.runTimes['bestLast'] String += 'Compute fusion : %.5f\n' % self.runTimes['fusing'] String += 'Store results : %.5f\n' % self.runTimes['storing'] return String def __init__(self,other, fusionOperator = "o-max", CoDual=False, Debug=False, #CppAgrum=False, Threading=True): from digraphsTools import ranking2preorder, omax, omin from copy import copy, deepcopy from pickle import dumps, loads, load from time import time self.CoDual=CoDual self.Debug=Debug #self.CppAgrum = CppAgrum self.Threading = Threading runTimes = {} t0 = time() #if Threading: digraph=deepcopy(other) digraph.recodeValuation(-1.0,1.0) self.name = digraph.name #self.__class__ = digraph.__class__ self.actions = copy(digraph.actions) self.order = len(self.actions) self.valuationdomain = digraph.valuationdomain self.originalRelation = digraph.relation runTimes['dataInput'] = time() - t0 # compute ranking by best and by last choosing t1 = time() if Threading and cpu_count()>2: print('Threading ...') self.nbrThreads = 2 from tempfile import TemporaryDirectory with TemporaryDirectory() as tempDirName: digraphFileName = tempDirName +'/dumpDigraph.py' if Debug: print('temDirName, digraphFileName', tempDirName,digraphFileName) fo = open(digraphFileName,'wb') pd = dumps(digraph,-1) fo.write(pd) fo.close() threadBest = _myRBCThread(1,"ComputeBest","best",tempDirName,CoDual,Debug) threadWorst = _myRBCThread(2,"ComputeWorst","worst",tempDirName,CoDual,Debug) threadBest.start() threadWorst.start() while active_children() != []: pass print('Exiting computing threads') rbbcFileName = tempDirName +'/rbbc.py' fi = open(rbbcFileName,'rb') digraph.rankingByBestChoosing = loads(fi.read()) fi.close() rbwcFileName = tempDirName + '/rbwc.py' fi = open(rbwcFileName,'rb') digraph.rankingByLastChoosing = loads(fi.read()) fi.close() else: self.nbrThreads = 1 from sys import setrecursionlimit setrecursionlimit(2**20) digraph.computeRankingByBestChoosing(CoDual=CoDual,Debug=Debug) digraph.computeRankingByLastChoosing(CoDual=CoDual,Debug=Debug) setrecursionlimit(1000) runTimes['bestLast'] = time() - t1 # compute ranking fusion t2 = time() relBest = digraph.computeRankingByBestChoosingRelation() if Debug: digraph.showRankingByBestChoosing() relLast = digraph.computeRankingByLastChoosingRelation() if Debug: digraph.showRankingByLastChoosing() relFusion = {} Med = digraph.valuationdomain['med'] for x in digraph.actions: relFusion[x] = {} fx = relFusion[x] bx = relBest[x] lx = relLast[x] for y in digraph.actions: L = [bx[y],lx[y]] if fusionOperator == "o-max": fx[y] = omax(Med,L) elif fusionOperator == "o-min": fx[y] = omin(Med,L) else: print('Error: invalid epistemic fusion operator %s' % operator) return runTimes['fusing'] = time() - t2 # storing results t3 = time() self.relation=relFusion self.rankingByLastChoosing = copy(digraph.rankingByLastChoosing) self.rankingByBestChoosing = copy(digraph.rankingByBestChoosing) if Debug: self.computeRankingByChoosing() self.showRankingByChoosing() self.gamma = self.gammaSets() self.notGamma = self.notGammaSets() runTimes['storing'] = time() -t3 runTimes['totalTime'] = time() - t0 self.runTimes = runTimes
[docs] def showTransitiveDigraph(self,rankingByChoosing=None,rankingStrategy='optimistic',WithCoverCredibility=False): """ Specialization of generic method. Without argument, a weak ordering is recomputed from the valued self relation. """ if rankingByChoosing is None: if rankingStrategy == 'optimistic': TransitiveDigraph.showTransitiveDigraph(self, rankingByChoosing=self.computeRankingByBestChoosing()) else: TransitiveDigraph.showTransitiveDigraph(self, rankingByChoosing=self.computeRankingByLastChoosing()) else: TransitiveDigraph.showTransitiveDigraph(self,rankingByChoosing=rankingByChoosing)
[docs] def showRankingByChoosing(self,rankingByChoosing=None,rankingStrategy='optimistic',WithCoverCredibility=False): """ Dummy for showTransitiveDigraph method """ self.showTransitiveDigraph(rankingByChoosing=rankingByChoosing, rankingStrategy=rankingStrategy, WithCoverCredibility=WithCoverCredibility)
[docs] def computeRankingByBestChoosing(self,Forced=False): """ Dummy for blocking recomputing without forcing. """ if Forced: TransitiveDigraph.computeRankingByBestChoosing(self,CoDual=self.CoDual)
[docs] def computeRankingByLastChoosing(self,Forced=False): """ Dummy for blocking recomputing without forcing. """ if Forced: TransitiveDigraph.computeRankingByLastChoosing(self,CoDual=self.CoDual)
#--------------------
[docs] class RankingByBestChoosingDigraph(RankingByChoosingDigraph): """ Specialization of abstract TransitiveDigraph class for computing a ranking by best-choosing. """ def __init__(self,digraph,Normalized=True,CoDual=False,Debug=False): from copy import copy as deepcopy digraphName = 'ranking-by-best'+digraph.name self.name = deepcopy(digraphName) self.actions = deepcopy(digraph.actions) self.order = len(self.actions) self.valuationdomain = deepcopy(digraph.valuationdomain) digraph.computeRankingByBestChoosing(CoDual=CoDual,Debug=False) self.relation = digraph.computeRankingByBestChoosingRelation() if Normalized: self.recodeValuation(-1,1) self.order = len(self.actions) self.gamma = self.gammaSets() self.notGamma = self.notGammaSets() self.rankingByBestChoosing = digraph.rankingByBestChoosing
[docs] def showTransitiveDigraph(self): """ Specialisation of showTransitiveDigraph() for ranking-by-best-choosing results. """ self.showRankingByBestChoosing()
[docs] class RankingByLastChoosingDigraph(RankingByChoosingDigraph): """ Specialization of abstract TransitiveDigraph class for computing a ranking by rejecting. """ def __init__(self,digraph,Normalized=True,CoDual=False,Debug=False): from copy import copy as deepcopy digraphName = 'ranking-by-last'+digraph.name self.name = deepcopy(digraphName) self.actions = deepcopy(digraph.actions) self.order = len(self.actions) self.valuationdomain = deepcopy(digraph.valuationdomain) digraph.computeRankingByLastChoosing(CoDual=CoDual,Debug=False) self.relation = digraph.computeRankingByLastChoosingRelation() if Normalized: self.recodeValuation(-1,1) self.order = len(self.actions) self.gamma = self.gammaSets() self.notGamma = self.notGammaSets() self.rankingByLastChoosing = digraph.rankingByLastChoosing
[docs] def showTransitiveDigraph(self): """ Specialisation of showTransitiveDigraph() for ranking-by-last-choosing results. """ self.showRankingByLastChoosing()
[docs] class RankingByPrudentChoosingDigraph(RankingByChoosingDigraph): """ Specialization for ranking-by-rejecting results with prudent single elimination of chordless circuits. By default, the cut level for circuits elimination is set to 20% of the valuation domain maximum (1.0). """ def __init__(self,digraph,CoDual=False,Normalized=True,Odd=True,Limited=0.2,Comments=False,Debug=False,SplitCorrelation=True): from copy import copy, deepcopy from time import time if Comments: t0 = time() print('------- Commenting ranking by prudent choosing ------') digraph_ = deepcopy(digraph) if Normalized: digraph_.recodeValuation(-1,1) digraphName = 'sorting-by-prudent-choosing'+digraph_.name self.name = digraphName self.actions = copy(digraph_.actions) self.order = len(self.actions) self.valuationdomain = copy(digraph_.valuationdomain) s1 = RankingByLastChoosingDigraph(digraph_,CoDual=CoDual,Debug=False) s2 = RankingByBestChoosingDigraph(digraph_,CoDual=CoDual,Debug=False) fus = FusionDigraph(s1,s2) cutLevel = min(digraph_.minimalValuationLevelForCircuitsElimination(Odd=Odd,Debug=Debug,Comments=Comments),Decimal(Limited)) self.cutLevel = cutLevel if cutLevel > self.valuationdomain['med']: if cutLevel < self.valuationdomain['max']: gp = PolarisedDigraph(digraph_,level=cutLevel,StrictCut=True) else: gp = PolarisedDigraph(digraph_,level=cutLevel,StrictCut=False) s1p = RankingByLastChoosingDigraph(gp,CoDual=CoDual,Debug=False) s2p = RankingByBestChoosingDigraph(gp,CoDual=CoDual,Debug=False) fusp = FusionDigraph(s1p,s2p) corrgp = digraph_.computeOrdinalCorrelation(fusp) corrg = digraph_.computeOrdinalCorrelation(fus) if Comments: print('Correlation with cutting : %.3f x %.3f = %.3f' % (corrgp['correlation'],corrgp['determination'],corrgp['correlation']*corrgp['determination'])) print('Correlation without cutting : %.3f x %.3f = %.3f' % (corrg['correlation'],corrg['determination'],corrg['correlation']*corrg['determination'])) if SplitCorrelation: if corrgp['correlation'] > corrg['correlation']: self.relation = deepcopy(fusp.relation) self.rankingByBestChoosing = sp2.rankingByBestChoosing self.rankingByLastChoosing = sp1.rankingByLastChoosing else: self.relation = deepcopy(fus.relation) self.rankingByBestChoosing = s2.rankingByBestChoosing self.rankingByLastChoosing = s1.rankingByLastChoosing else: if (corrgp['correlation']*corrgp['determination']) > (corrg['correlation']*corrg['determination']): self.relation = deepcopy(fusp.relation) self.rankingByBestChoosing = sp2.rankingByBestChoosing self.rankingByLastChoosing = sp1.rankingByLastChoosing else: self.relation = deepcopy(fus.relation) self.rankingByBestChoosing = s2.rankingByBestChoosing self.rankingByLastChoosing = s1.rankingByLastChoosing else: self.relation = copy(fus.relation) self.rankingByBestChoosing = s2.rankingByBestChoosing self.rankingByLastChoosing = s1.rankingByLastChoosing #self.rankingByChoosing = self.computeRankingByChoosing(CoDual=CoDual) self.gamma = self.gammaSets() self.notGamma = self.notGammaSets() if Comments: t1 = time() gdeter = digraph_.computeDeterminateness() self.showTransitiveDigraph() print('Circuits cutting level limit : %.3f' % Limited) print('Circuits elimination cut level: %.3f' % self.cutLevel) print('Ordinal Correlation with given outranking') corr = digraph_.computeOrdinalCorrelation(self) print('Correlation : %.3f' % corr['correlation']) print('Determinateness : %.3f (%.3f)' % (corr['determination'],gdeter)) print('Execution time : %.4f sec.' % (t1-t0))
# multiprocessing thread for the principal fusion class #from multiprocessing import Process, Lock, active_children, cpu_count class _myPRIODThread(Process): def __init__(self, threadID, name, direction, tempDirName, imageType, plotFileName, Debug): Process.__init__(self) self.threadID = threadID self.name = name self.direction = direction self.workingDirectory = tempDirName self.imageType = imageType self.plotFileName = plotFileName self.Debug = Debug def run(self): from pickle import dumps, loads from os import chdir from linearOrders import PrincipalOrder chdir(self.workingDirectory) if self.Debug: print("Starting working in %s on %s" % (self.workingDirectory,self.name) ) fi = open('dumpDigraph.py','rb') digraph = loads(fi.read()) fi.close() if self.direction == 'col': fo = open('priCol.py','wb') pc = PrincipalOrder(digraph, Colwise=True, imageType=self.imageType, plotFileName=self.plotFileName, Debug=self.Debug) fo.write(dumps(pc,-1)) elif self.direction == 'row': fo = open('priRow.py','wb') pl = PrincipalOrder(digraph, Colwise=False, imageType=self.imageType, plotFileName=self.plotFileName, Debug=self.Debug) fo.write(dumps(pl,-1)) fo.close()
[docs] class PrincipalInOutDegreesOrderingFusion(TransitiveDigraph): """ Specialization of abstract TransitiveDigraph class for ranking by fusion of the principal orders of the variance-covariance of in- (Colwise) and outdegrees (Rowwise). Example Python3 session with same outranking digraph g as shown in the RankingByChoosingDigraph example session (see below). >>> from transitiveDigraphs import * >>> from votingProfiles import * >>> v = RandomLinearVotingProfile(numberOfVoters=99, ... numberOfCandidates=9,seed=201) >>> g = CondorcetDigraph(v) >>> pro = PrincipalInOutDegreesOrdering(g,imageType="png", ... plotFileName="proTransitiveDigraphing") Threading ... Exiting both computing threads >>> pro.showTransitiveDigraph() Ranking by Choosing and Rejecting 1st ranked ['c9'] 2nd ranked ['c8'] 3rd ranked ['c4'] 4th ranked ['c5'] 5th ranked ['c2'] 5th last ranked ['c2']) 4th last ranked ['c3']) 3rd last ranked ['c6']) 2nd last ranked ['c7']) 1st last ranked ['c1']) >>> pro.showPrincipalScores(ColwiseOrder=True) List of principal scores Column wise covariance ordered action colwise rowwise c9 0.23974 0.23974 c8 0.15961 0.15961 c4 0.14299 0.14299 c5 0.04205 0.04205 c2 -0.04186 -0.04186 c3 -0.04552 -0.04552 c6 -0.11143 -0.11143 c7 -0.16567 -0.16567 c1 -0.21991 -0.21991 """ def __init__(self,other,fusionOperator="o-max", imageType=None, plotFileName=None, Threading=True, Debug=False): from copy import copy, deepcopy from linearOrders import PrincipalOrder from pickle import dumps, loads, load #if Threading: digraph = deepcopy(other) digraph.recodeValuation(-1.0,1.0) self.name = digraph.name #self.__class__ = digraph.__class__ if isinstance(digraph.actions,list): self.actions = {} for x in digraph.actions: self.actions[x] = {} self.actions[x]['name'] = x else: self.actions = digraph.actions self.order = len(self.actions) self.valuationdomain = digraph.valuationdomain if Threading and cpu_count()>2: print('Threading ...') from tempfile import TemporaryDirectory with TemporaryDirectory() as tempDirName: digraphFileName = tempDirName +'/dumpDigraph.py' if Debug: print('temDirName, digraphFileName', tempDirName,digraphFileName) fo = open(digraphFileName,'wb') pd = dumps(digraph,-1) fo.write(pd) fo.close() threadCol = _myPRIODThread(1,"ComputeCol","col", tempDirName, imageType=imageType, plotFileName=plotFileName, Debug=Debug) threadRow = _myPRIODThread(1,"ComputeRow","row", tempDirName, imageType=imageType, plotFileName=plotFileName, Debug=Debug) threadCol.start() threadRow.start() while active_children() != []: pass print('Exiting both computing threads') priColFileName = tempDirName+'/priCol.py' fi = open(priColFileName,'rb') pc = loads(fi.read()) fi.close() priRowFileName = tempDirName+'/priRow.py' fi = open(priRowFileName,'rb') pl = loads(fi.read()) fi.close() else: pc = PrincipalOrder(digraph,Colwise=True, imageType=imageType, plotFileName=plotFileName, Debug=Debug) pl = PrincipalOrder(digraph,Colwise=False, imageType=imageType, plotFileName=plotFileName, Debug=Debug) if Debug: print('Row wise: ') print(pl.principalRowwiseScores) pl.computeOrder() print(self.actions) for x in pl.principalRowwiseScores: print(x) self.principalRowwiseScores = copy(pl.principalRowwiseScores) for x in pl.principalRowwiseScores: self.actions[x[1]]['principalRowwiseScore'] = x[0] if Debug: print('Column wise: ') print(pc.principalColwiseScores) pc.computeOrder() self.principalColwiseScores = copy(pc.principalColwiseScores) for x in pc.principalColwiseScores: self.actions[x[1]]['principalColwiseScore'] = x[0] pf = FusionDigraph(pl,pc,operator=fusionOperator) self.relation = copy(pf.relation) self.gamma = self.gammaSets() self.notGamma = self.notGammaSets() #self.computeRankingByChoosing() #self.rankingByBestChoosing = pl.computeRankingByBestChoosing() #self.rankingByLastChoosing = pc.computeRankingByLastChoosing() if Debug: print(self.computeOrdinalCorrelation(digraph))
[docs] def showPrincipalScores(self, ColwiseOrder=False, RowwiseOrder=False): """ showing the principal in- (Colwise) and out-degrees (Rowwise) scores. """ print('List of principal scores') if not ColwiseOrder and not RowwiseOrder: print('action \t colwise \t rowwise') for x in self.actions: print('%s \t %.5f \t %.5f' %\ (x,self.actions[x]['principalColwiseScore'], self.actions[x]['principalRowwiseScore'])) elif ColwiseOrder: print('Column wise covariance ordered') print('action \t colwise \t rowwise') for (y,x) in self.principalColwiseScores: print('%s \t %.5f \t %.5f' %\ (x,self.actions[x]['principalColwiseScore'], self.actions[x]['principalRowwiseScore'])) else: print('Row wise covariance ordered') print('action \t colwise \t rowwise') for (y,x) in self.principalRowwiseScores: print('%s \t %.5f \t %.5f' %\ (x,self.actions[x]['principalColwiseScore'], self.actions[x]['principalRowwiseScore']))
[docs] def computeTransitiveDigraph(self, ColwiseOrder=False): """ Specialisation for PrincipalInOutDegreesOrderings. """ if ColwiseOrder: lps = self.principalColwiseScores else: lps = self.principalRowwiseScores n = len(lps) n2 = n//2 ordering = [] for i in range(n2): x = lps[i] y = lps[n-i-1] ordering.append( ( (x[0],[x[1]]),(y[0],[y[1]]) ) ) if 2*n2 < n: x = lps[n2] ordering.append( ( (x[0],[x[1]]),(x[0],[x[1]]) ) ) TransitiveDigraphing = {'result':ordering} return TransitiveDigraphing
[docs] def showTransitiveDigraph(self, ColwiseOrder=False): """ Specialisation for PrincipalInOutDegreesOrderings. """ ## if rankingByChoosing is None: ## try: ## TransitiveDigraphing = self.rankingByChoosing ## except: ## TransitiveDigraphing = self.computeRankingByChoosing(CoDual=False,CppAgrum=False) if ColwiseOrder: lps = self.principalColwiseScores else: lps = self.principalRowwiseScores n = len(lps) n2 = n//2 ordering = [] for i in range(n2): x = lps[i] y = lps[n-i-1] ordering.append( ( (x[0],[x[1]]),(y[0],[y[1]]) ) ) if 2*n2 < n: x = lps[n2] ordering.append( ( (x[0],[x[1]]),(x[0],[x[1]]) ) ) TransitiveDigraphing = {'result':ordering} #print(TransitiveDigraphing) TransitiveDigraph.showTransitiveDigraph(self,TransitiveDigraphing)
[docs] def exportGraphViz(self,fileName=None,direction='ColwiseOrder', Comments=True,graphType='png', graphSize='7,7', fontSize=10): """ Specialisation for PincipalInOutDegrees class. direction = "Colwise" (best to worst, default) | "Rowwise" (worst to best) """ if direction == "Colwise": direction = 'best' else: direction = 'worst' TransitiveDigraph.exportGraphViz(self, fileName=fileName, direction=direction, Comments=Comments, graphType=graphType, graphSize=graphSize, fontSize=fontSize)
################## def _jobTaskRubis(categID): """ Task definition for multiprocessing threaded local RubisChoice jobs in the QantilesRankingDigraph contructor. .. note:: Parameter maxContent: maximum allowed local catContent for RankingByRubisChoice is set to 30. Above this cardinality, the PrincipalInOutDegreesOrdering is used. """ from tempfile import TemporaryDirectory from os import getcwd, chdir from pickle import dumps, loads, load from copy import copy as deepcopy from outrankingDigraphs import BipolarOutrankingDigraph #from linearOrders import RankedPairsOrder, KohlerOrder from TransitiveDigraphs import PrincipalInOutDegreesOrdering maxCatContent = 30 Comments = False if Comments: print("Starting working on category %d" % (categID), end=" ") fiName = 'partialPerfTab-'+str(categID)+'.py' fi = open(fiName,'rb') pt = loads(fi.read()) fi.close() with TemporaryDirectory() as tempDirName: cwd = getcwd() chdir(tempDirName) digraph = BipolarOutrankingDigraph(pt,Normalized=True) Max = digraph.valuationdomain['max'] Med = digraph.valuationdomain['med'] catContent = [x for x in digraph.actions] nc = len(catContent) print(nc,maxCatContent) #print(catContent) if nc <= maxCatContent: currActions = list(catContent) try: catCRbc = digraph.computeRankingByChoosing(CoDual=True) except: if Comments: print('==>>> Failed RBC: Principal ranking') ## rp = RankedPairsOrder(digraph) ## catRbc = rp.computeRankingByChoosing() ## ko = KohlerOrder(digraph) ## catCRbc = ko.computeRankingByChoosing() try: pri = PrincipalInOutDegreesOrdering(digraph,Threading=False) catCRbc = pri.computeTransitiveDigraph() except: catCRbc = {'result': \ [((digraph.valuationdomain['max'],catContent), (digraph.valuationdomain['max'],catContent))]} else: if Comments: print('==>>> Exceeds %d: Principal ranking' % maxCatContent) try: pri = PrincipalInOutDegreesOrdering(digraph,Threading=False) catCRbc = pri.computeTransitiveDigraph() except: catCRbc = {'result': [((digraph.valuationdomain['max'],catContent), (digraph.valuationdomain['max'],catContent))]} splitCatRelation = [catCRbc['result']] chdir(cwd) foName = 'splitCatRelation-'+str(categID)+'.py' fo = open(foName,'wb') fo.write(dumps(splitCatRelation,-1)) fo.close() writestr = 'Finished category %d %d' % (categID,nc) return writestr ##### def _jobTaskKohler(categID): """ Task definition for multiprocessing threaded jobs in QsRbcRanking. .. Note:: The indiviual quantile classes are linarly order with Kohler's ranking rule. """ from pickle import dumps, loads, load from copy import copy as deepcopy from outrankingDigraphs import BipolarOutrankingDigraph from linearOrders import KohlerOrder from TransitiveDigraphs import PrincipalInOutDegreesOrdering Comments = False if Comments: print("Starting working on category %d" % (categID), end=" ") fiName = 'partialPerfTab-'+str(categID)+'.py' fi = open(fiName,'rb') pt = loads(fi.read()) fi.close() digraph = BipolarOutrankingDigraph(pt,Normalized=True) nc = digraph.order if Comments: print(nc) ko = KohlerOrder(digraph) catCRbc = ko.computeRankingByChoosing() splitCatRelation = [catCRbc['result']] foName = 'splitCatRelation-'+str(categID)+'.py' fo = open(foName,'wb') fo.write(dumps(splitCatRelation,-1)) fo.close() writestr = 'Finished category %d %d' % (categID,nc) return writestr ##### def _jobTaskKohlerFusion(categID): """ Task definition for multiprocessing threaded jobs in Quantiles Ranking. .. Note:: The indiviual quantile classes are linarly order with a fusion between Kohler's and Arrow-Raynaud's ranking rules. The latter corresponds to Kohler's rule applied to the codual outranking digraph. """ from pickle import dumps, loads, load from copy import copy as deepcopy from outrankingDigraphs import BipolarOutrankingDigraph from linearOrders import KohlerOrder from TransitiveDigraphs import PrincipalInOutDegreesOrdering Comments = False if Comments: print("Starting working on category %d" % (categID), end=" ") fiName = 'partialPerfTab-'+str(categID)+'.py' fi = open(fiName,'rb') pt = loads(fi.read()) fi.close() digraph = BipolarOutrankingDigraph(pt,Normalized=True) nc = digraph.order if Comments: print(nc) ko = KohlerOrder(digraph) kos = KohlerOrder((~(-digraph))) fk = FusionDigraph(ko,kos) catCRbc = KohlerOrder.computeRankingByChoosing(fk) #catRbc = deepcopy(catCRbc['result']) splitCatRelation = [catCRbc['result']] foName = 'splitCatRelation-'+str(categID)+'.py' fo = open(foName,'wb') fo.write(dumps(splitCatRelation,-1)) fo.close() writestr = 'Finished category %d %d' % (categID,nc) return writestr #####
[docs] class WeakCopelandOrder(TransitiveDigraph): """ instantiates the Weak Copeland Order from a given bipolar-valued Digraph instance If *SelfCoDual* == *True*, strict incomparabilities are coded as *indeterminate* situations. Only the asymmetric part of the preorder is instantiated. Otherwise, the classic definition of the weak order as complement of the preorder is instantiated. Note: *WithFairestRanking=True*, requires *other* to be a valid outranking digraph. """ def __init__(self,other,WithFairestRanking=False,SelfCoDual=True,Debug=False): """ constructor for generating a weak order from a given other digraph following the Copeland ordering rule. """ from copy import deepcopy from collections import OrderedDict from time import time from operator import itemgetter from digraphsTools import all_partial_perms #timings tt = time() runTimes = OrderedDict() # prepare local variables otherRelation = other.relation n = len(other.actions) actions = deepcopy(other.actions) gamma = other.gamma selfRelation = {} Min = Decimal('-1.0') Med = Decimal('0.0') Max = Decimal('1.0') valuationdomain = {'min': Min, 'med': Med, 'max': Max} runTimes['prepareLocals'] = time()-tt # compute Copeland scores tnf = time() c = PolarisedDigraph(other) cRelation = c.relation copelandScores = [] for x in actions: ## gx = gamma[x] copelandScore = Decimal('0') for y in actions: copelandScore += cRelation[x][y] - cRelation[y][x] ## copelandScore = len(gx[0]) - len(gx[1]) actions[x]['score'] = copelandScore copelandScores.append((copelandScore,x)) # sorting with keeping the actions initial ordering # in case of ties copelandScores.sort(key=itemgetter(0)) copelandOrder = [x[1] for x in copelandScores] self.copelandOrder = copelandOrder copelandScores.sort(key=itemgetter(0),reverse=True) copelandRanking = [x[1] for x in copelandScores] self.copelandRanking = copelandRanking self.copelandScores = copelandScores # computing the Copeland preorder pl = OrderedDict() for score in copelandScores: try: pl[score[0]].append(score[1]) except: pl[score[0]] = [score[1]] plist = [] for eq in pl: plist.append(pl[eq]) self.copelandPreRanking = plist # computing the Copeland permutations # and fairest Copeland Ranking # Requires *other* to be an outranking digraph instance if WithFairestRanking: perms = [] fairestPerms = [] for p in all_partial_perms(plist): perms.append(p) corr = other.computeRankingCorrelation(p) try: pRes = other.computeRankingConsensusQuality(p) except: print('Error: WithFairestRanking is only available for outranking instances!') return fairestPerms.append([float(pRes[1])-pRes[2],pRes[1], pRes[2],corr['correlation'],p]) fairestPerms.sort(key=itemgetter(0),reverse=True) self.copelandPermutations = fairestPerms self.fairestCopelandRanking = fairestPerms[0][4] # mearsuring run time runTimes['copeland'] = time() - tnf # init relation tr = time() for x in actions: selfRelation[x] = {} for y in actions: if SelfCoDual and x == y: selfRelation[x][y] = Med else: selfRelation[x][y] = Min sx = actions[x]['score'] sy = actions[y]['score'] if sx > sy: selfRelation[x][y] = Max elif SelfCoDual and sx == sy: selfRelation[x][y] = Med else: selfRelation[x][y] = Min runTimes['relation'] = time() - tr if Debug: print(selfRelation) self.name = other.name + '_ranked' self.actions = actions self.order = n self.valuationdomain = valuationdomain self.relation = selfRelation self.gamma = self.gammaSets() self.notGamma = self.notGammaSets() runTimes['totalTime'] = time() - tt self.runTimes = runTimes def showScores(self,direction='descending'): print('Copeland scores in %s order' % direction) print('action \t score') if direction == 'descending': for x in self.copelandScores: print('%s \t %.2f' %(x[1],-x[0])) else: for x in reversed(self.copelandScores): print('%s \t %.2f' %(x[1],-x[0]))
[docs] class WeakNetFlowsOrder(TransitiveDigraph): """ Instantiates the Weak NetFlows Order from a given bipolar-valued Digraph instance. If *SelfCoDual* == *True*, strict incomparabilities are coded as *indeterminate* situations. Only the asymmetric part of the preorder is instantiated. Otherwise, the classic definition of the weak order as complement of the preorder is instantiated. """ def __init__(self,other,SelfCoDual=True,Debug=False): """ Constructor for generating a weak order from a given other digraph following the NetFlows ordering rule """ #from copy import deepcopy from collections import OrderedDict from time import time from operator import itemgetter #timings tt = time() runTimes = OrderedDict() # prepare local variables otherRelation = other.relation n = len(other.actions) actions = other.actions gamma = other.gamma selfRelation = {} Min = Decimal('-1.0') Med = Decimal('0.0') Max = Decimal('1.0') valuationdomain = {'min': Min, 'med': Med, 'max': Max} runTimes['prepareLocals'] = time()-tt # compute net flows tnf = time() netFlowScores = [] for x in actions: netFlowScore = Decimal('0') for y in actions: netFlowScore += otherRelation[x][y] - otherRelation[y][x] if Debug: print(x,y,otherRelation[x][y],otherRelation[y][x],netFlowScore) actions[x]['score'] = netFlowScore netFlowScores.append((netFlowScore,x)) # reversed sorting with keeping the actions initial ordering # in case of ties netFlowScores.sort(reverse=True,key=itemgetter(0)) self.netFlowScores = netFlowScores netFlowsRanking = [x[1] for x in netFlowScores] self.netFlowsRanking = netFlowsRanking netFlowsOrder = list(reversed(netFlowsRanking)) self.netFlowsOrder = netFlowsOrder runTimes['netFlows'] = time() - tnf # init relation tr = time() for x in actions: selfRelation[x] = {} for y in actions: if SelfCoDual and x == y: selfRelation[x][y] = Med else: selfRelation[x][y] = Min sx = actions[x]['score'] sy = actions[y]['score'] if sx > sy: selfRelation[x][y] = Max elif SelfCoDual and sx == sy: selfRelation[x][y] = Med else: selfRelation[x][y] = Min runTimes['relation'] = time() - tr if Debug: print(selfRelation) self.name = other.name + '_ranked' self.actions = actions self.order = n self.valuationdomain = valuationdomain self.relation = selfRelation self.gamma = self.gammaSets() self.notGamma = self.notGammaSets() runTimes['totalTime'] = time() - tt self.runTimes = runTimes def showScores(self,direction='descending'): print('NetFlows scores in %s order' % direction) print('action \t score') if direction == 'descending': for x in self.netFlowScores: print('%s \t %.2f' %(x[1],x[0])) else: for x in reversed(self.netFlowScores): print('%s \t %.2f' %(x[1],x[0]))
######### # compatibility with obsolete weakOrders module #######################
[docs] class PartialRanking(RankingsFusion): """ dummy class for backward compatibility. """
[docs] class KemenyWeakOrder(KemenyOrdersFusion): """ dummy class for backward compatibility. """
[docs] class PrincipalInOutDegreesOrdering(PrincipalInOutDegreesOrderingFusion): """ dummy class for backward compatibility. """
[docs] class WeakOrder(TransitiveDigraph): """ dummy class for backward compatibility. """
#################### #----------test outrankingDigraphs classes ---------------- if __name__ == "__main__": from digraphs import * from outrankingDigraphs import * from sortingDigraphs import * from transitiveDigraphs import * from linearOrders import * from votingProfiles import * from time import time print(""" **************************************************** * Digraph3 transitiveDigraphs module * * depends on BipolarOutrankingDigraph and * * $Revision$ Python3.9 * * Copyright (C) 2010-2021 Raymond Bisdorff * * The module comes with ABSOLUTELY NO WARRANTY * * to the extent permitted by the applicable law. * * This is free software, and you are welcome to * * redistribute it if it remains free software. * **************************************************** """) # v = RandomLinearVotingProfile(numberOfVoters=99, numberOfCandidates=9,seed=201) # g = CondorcetDigraph(v) #### g = RandomBipolarOutrankingDigraph(Normalized=True) ## rbc = RankingByChoosingDigraph(g,Threading=False,Debug=True) ## rbc.showRankingByChoosing() ## print(rbc) ## rbc.exportTopologicalGraphViz('test') ## rbc.exportGraphViz('test1') ## pt = RandomCBPerformanceTableau(numberOfCriteria=5,seed=1000) g = BipolarOutrankingDigraph(pt) wcg = WeakCopelandOrder(g,WithFairestRanking=True) print(wcg.copelandPermutations) print(wcg.copelandPreRanking) print(wcg.fairestCopelandRanking) print('*------------------*') print('If you see this line all tests were passed successfully :-)') print('Enjoy !')