"""
Python3 implementation of a solver for **dynamic programming** problems.
Copyright (C) 2023 Raymond Bisdorff
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License along
with this program; if not, write to the Free Software Foundation, Inc.,
51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
"""
from transitiveDigraphs import TransitiveDigraph
[docs]
class DynamicProgrammingDigraph(TransitiveDigraph):
"""
Implementation of the *Bellman*, *Dijkstra*, or *Viterbi* min-sum algorithm for
solving the canonical dynamic programming problem
(see *Numerical Recipes, the Art of Scientific Computing* 3rd Ed. (2007),
W.H. Press, S.A. Teukolsky, W.T: Vetterling and B.P. Flannery, Cambridge Unviersity Press,
ISBN 978-0-521-88068-8, pp 555-562).
We provide the lowest cost, respectively highest benefit, path from a single *source* node
to a single *sink* node via an integer number *nstages* of
execution stages characterized by finite subsets of execution states.
Each forward arc from a state *x* in stage *i* to a state *y* in stage *i+1* is
labelled with a decimal value stored, like the digraph relation,
in a double dictionary *self.costs* attribute.
"""
def __init__(self,fileName=None,Debug=False):
from decimal import Decimal
from copy import deepcopy
if fileName is None:
print('Error: the name of a stored DPdigraph file is required')
else:
fileNameExt = fileName+'.py'
argDict = {}
fi = open(fileNameExt,'r')
fileText = fi.read()
fi.close()
exec(compile(fileText, fileNameExt, 'exec'),argDict)
self.name = fileName
self.actions = argDict['actions']
self.order = len(self.actions)
self.valuationdomain = argDict['valuationdomain']
self.relation = argDict['relation']
self.size = self.computeSize()
self.gamma = self.gammaSets()
self.notGamma = self.notGammaSets()
self.source = argDict['source']
self.sink = argDict['sink']
self.stages = self.computeStages(Debug=Debug)
self.costsRange = argDict['costsRange']
self.preferenceDirection = argDict['preferenceDirection']
self.costs = argDict['costs']
self.optimalPath = self.computeDynamicProgrammingSolution(Debug=Debug)
[docs]
def save(self,fileName='tempDPdigraph',decDigits=2):
"""
Persistent storage of a dynamic programming problem
in the format of a python source code file. The stored file may be reloaded with the
:py:class:`~dynamicProgramming.DynamicProgrammingDigraph` class.
*self.stages*, *self.optimalPath* and self.bestSum* attributes are automatically
added by the class constructor.
"""
print('*--- Saving DP digraph in file: <' + fileName + '.py> ---*')
actions = self.actions
relation = self.relation
costs = self.costs
costsRange = self.costsRange
prefDir = self.preferenceDirection
Min = self.valuationdomain['min']
Med = self.valuationdomain['med']
Max = self.valuationdomain['max']
fileNameExt = str(fileName)+str('.py')
fo = open(fileNameExt, 'w')
fo.write('# Saved dynamic programming problem\n')
fo.write('from decimal import Decimal\n')
# write actions
fo.write('from collections import OrderedDict\n')
fo.write('actions = OrderedDict([\n')
for x in actions:
fo.write('(\'' + str(x) + '\',\n')
try:
fo.write(str(actions[x])+'),\n')
except:
fo.write('{\'name\': \'%s\'}),\n' % str(x))
fo.write('])\n')
fo.write('source = \'%s\'\n' % self.source)
fo.write('sink = \'%s\'\n' % self.sink)
# write relation
IntegerValuation = self.valuationdomain['hasIntegerValuation']
if not IntegerValuation:
fo.write('valuationdomain = {\'hasIntegerValuation\': False, \'min\': Decimal("'+str(Min)+'"),\'med\': Decimal("'+str(Med)+'"),\'max\': Decimal("'+str(Max)+'")}\n')
else:
fo.write('valuationdomain = {\'hasIntegerValuation\': True, \'min\': Decimal("'+str(Min)+'"),\'med\': Decimal("'+str(Med)+'"),\'max\': Decimal("'+str(Max)+'")}\n')
fo.write('relation = {\n')
for x in actions:
fo.write('\'' + str(x) + '\': {\n')
for y in actions:
if not IntegerValuation:
valueString = '\': Decimal(\'%%.%df\'),\n' % (decDigits)
fo.write('\'' + str(y) + (valueString % relation[x][y]))
#fo.write('\'' + str(y) + '\': Decimal("' + str(relation[x][y]) + '"),\n')
else:
fo.write('\'' + str(y) + '\':' + str(relation[x][y]) + ',\n')
fo.write('},\n')
fo.write( '}\n')
# write costs
valueString = 'Decimal(\'%%.%df\')' % (decDigits)
fo.write('costsRange = (' + (valueString % costsRange[0]) \
+ ',' + (valueString % costsRange[1]) + ')\n' )
fo.write('preferenceDirection = \'%s\'\n' % prefDir)
fo.write('costs = {\n')
for x in actions:
fo.write('\'' + str(x) + '\': {\n')
for y in actions:
if not IntegerValuation:
valueString = '\': Decimal(\'%%.%df\'),\n' % (decDigits)
fo.write('\'' + str(y) + (valueString % costs[x][y]))
else:
fo.write('\'' + str(y) + '\':' + str(costs[x][y]) + ',\n')
fo.write('},\n')
fo.write( '}\n')
fo.close()
[docs]
def computeStages(self,Debug=False):
"""
Renders the decomposition of a
:py:class:`~dynamicProgramming.DynamicProgrammingDigraph` object into a list
of successive stages --subsets of states-- by taking the progessive union of the *gama* sets,
starting from a single *self.source* node and ending in a single *self.sink* node.
"""
stages = []
source = self.source
sink = self.sink
stages.append([source])
spl = self.computeShortestPathLengths()
nstages = spl[source][sink]
for i in range(1,nstages+1):
if Debug:
print('stage: ',i)
neighbours = set()
for x in stages[i-1]:
nbx = self.gamma[x][0]
if Debug:
print(x,nbx)
neighbours |= nbx
nbList = list(neighbours)
nbList.sort()
stages.append(nbList)
if Debug:
print(stages)
if len(stages[0]) != 1 and len(stages[-1]) != 1:
print('Error: the given digraph is not a valid dynamic programming diagram')
else:
return stages
[docs]
def computeDynamicProgrammingSolution(self,Debug=False):
"""
The *Bellmann*, *Dijkstra*, or *Viterbi* dynamic programming algorithms a.o.,
all proceed with a recursive forward-computing of best paths
from stage *i-1* to stage *i*.
In a second step, the overall best path is determined by
backwards-selecting in each stage the best predecessor state.
The resulting optimal path is stored in the *self.optimalPath* attribute
and its global sum of costs (*prefernceDirection* == 'min'),
respectively benefits (*prefernceDirection* == 'max', *negative* costs)
is stored in the *self.bestSum* attribute.
"""
from decimal import Decimal
sink = self.sink
source = self.source
stages = self.stages
costsRange = self.costsRange
prefDir = self.preferenceDirection
nstate = []
nstages = len(stages)
Med = self.valuationdomain['med']
# forward computing best paths until satge *i*
for i in range(nstages):
#print(i,stages[i])
nstate.append(len(stages[i]))
Big = nstages * costsRange[1]
best = {}
best[0] = {}
best[0][0] = Decimal('0')
for i in range(1,nstages):
best[i] = {}
for k in range(nstate[i]):
xk = stages[i][k]
b = Big
for j in range(nstate[i-1]):
yj = stages[i-1][j]
if self.relation[yj][xk] > Med:
if prefDir == 'min':
a = best[i-1][j] + self.costs[yj][xk]
else:
a = best[i-1][j] - self.costs[yj][xk]
if a < b:
b = a
best[i][k] = b
if Debug:
self.best = best
self.bestSum = abs(best[nstages-1][0])
# Determine by backward inspec tion the best path from the sink to the source
answer = [0 for i in range(nstages)]
if Debug:
print(answer)
for i in range(nstages-2,0,-1):
if Debug:
print('>>>', i)
k = answer[i+1]
b = best[i+1][k]
if Debug:
print(i,b)
for j in range(nstate[i]):
xk = stages[i+1][k]
yj = stages[i][j]
if Debug:
print(j,xk,yj)
if self.relation[yj][xk] > self.valuationdomain['med']:
if prefDir == 'min':
temp = best[i][j] + self.costs[yj][xk]
else:
temp = best[i][j] - self.costs[yj][xk]
if Debug:
print('b,best,temp',b,best[i][j],temp)
if b == temp:
if Debug:
print('b,temp',b,temp)
answer[i] = j
break
if Debug:
print(answer)
optimalPath = []
for i in range(len(answer)):
if Debug:
print(stages[i][answer[i]])
optimalPath.append(stages[i][answer[i]])
return optimalPath
from dynamicProgramming import DynamicProgrammingDigraph
[docs]
class RandomDynamicProgrammingDigraph(DynamicProgrammingDigraph):
"""
Generator for creating random dynamic programming digraphs.
- *preferenceDirection* = 'min' (default) | 'max'
- *maxStages* = maximal number of stages
- *costsRange(a,b)* : integer limits (*a* < *b*) for the random generation of arc labels
Example Python session:
>>> from dynamicProgramming import RandomDynamicProgrammingDigraph
>>> dg = RandomDynamicProgrammingDigraph(
... order=12,
... maxStages=4,
... costsRange=(5,10),
... preferenceDirection='min',
... seed=2)
>>> dg
*------- Digraph instance description ------*
Instance class : RandomDynamicProgrammingDigraph
Instance name : randomDPdigraph
Digraph Order : 12
Digraph Size : 28
Valuation domain : [-1.00;1.00]
Determinateness (%) : 80.30
Attributes : ['name', 'order', 'actions', 'valuationdomain',
'relation', 'gamma', 'notGamma',
'costsRange', 'preferenceDirection',
'costs', 'source', 'sink', 'shortestPathLengths',
'stages', 'nstages',
'bestSum', 'optimalPath']
>>> print(dg.optimalPath)
['a01', 'a09', 'a02', 'a05', 'a12']
>>> print(dg.bestSum)
25.0
>>> dg.exportGraphViz('testDP',WithBestPathDecoration=True)
*---- exporting a dot file for GraphViz tools ---------*
Exporting to testDP.dot
dot -Grankdir=TB -Tpng testDP.dot -o testDP.png
*Figure*: The path that minimizes the sum of the costs labels
.. image:: testDP.png
:width: 400 px
:align: center
:alt: The dynamic programming solution
"""
def __init__(self,order=12,maxStages=4,costsRange=(5,10),
preferenceDirection='min',
seed=None,Debug=False):
from randomDigraphs import RandomDigraph
from decimal import Decimal
from collections import OrderedDict
from copy import deepcopy
import random
random.seed(seed)
g = RandomDigraph(order=order,seed=seed)
actionsList = [x for x in g.actions]
source = actionsList[0]
sink = actionsList[-1]
Max = g.valuationdomain['max']
Med = g.valuationdomain['med']
Min = g.valuationdomain['min']
if Debug:
print(actionsList,source,sink)
actionsStage = {source: 0, sink: maxStages}
for i in range(1,order-1):
x = actionsList[i]
actionsStage[x] = random.randint(1,maxStages-1)
if Debug:
print(actionsStage)
for i in range(order):
x = actionsList[i]
xst = actionsStage[x]
for j in range(order):
y = actionsList[j]
yst = actionsStage[y]
if x == y:
g.relation[x][y] = Med
if xst - yst == -1:
g.relation[x][y] = Max
elif xst - yst == 1:
g.relation[x][y] = Min
else:
g.relation[x][y] = Med
if Debug:
g.showRelationTable()
costs = {}
for i in range(order):
x = actionsList[i]
costs[x] = {}
for j in range(order):
y = actionsList[j]
if g.relation[x][y] > Med:
costs[x][y] = random.randint(costsRange[0],costsRange[1])
else:
costs[x][y] = Decimal('0')
## if x == y:
## costs[x][y] = Decimal('0')
## else:
## costs[x][y] = random.randint(costsRange[0],costsRange[1]) \
## * g.relation[x][y]
if Debug:
print(costs)
self.name = 'randomDPdigraph'
self.order = order
self.actions = deepcopy(g.actions)
self.valuationdomain = {'min':Min,'med':Med,'max':Max,
'hasIntegerValuation': False}
self.relation = deepcopy(g.relation)
self.closeTransitive(Reverse = True)
self.costsRange = costsRange
self.preferenceDirection = preferenceDirection
self.costs = costs
self.gamma = self.gammaSets()
self.notGamma = self.notGammaSets()
self.source = source
self.sink = sink
self.stages = self.computeStages(Debug=Debug)
self.nstages = len(self.stages)
self.optimalPath = self.computeDynamicProgrammingSolution(Debug=Debug)
# --------------------
###############################
if __name__ == '__main__':
print("""
****************************************************
* Digraph3 dynamicProgramming module *
* Revision: Python3.10 *
* Copyright (C) 2023 Raymond Bisdorff *
* The module comes with ABSOLUTELY NO WARRANTY *
* to the extent permitted by the applicable law. *
* This is free software, and you are welcome to *
* redistribute it if it remains free software. *
****************************************************
""")
###### scratch pad for testing the module components
dg = RandomDynamicProgrammingDigraph(order=12,
maxStages=4,
costsRange=(5,10),
preferenceDirection='min',
seed=2)
print(dg.optimalPath)
print(dg.bestSum)
print(dg.preferenceDirection)
dg.exportGraphViz('testDP',WithBestPathDecoration=True)
dg.save()
dg1 = DynamicProgrammingDigraph('tempDPdigraph')
print(dg1.optimalPath)
print(dg1.bestSum)
print(dg1.preferenceDirection)
print('*------------------*')
print('If you see this line all tests were passed successfully :-)')
print('Enjoy !')
#####################################