- 论坛徽章:
- 2
|
使用pygame
代码如下:- #!/usr/bin/python
- """
- a python snake AI, which use the A* algorithm
- just for fun
- you can see more details from this website.
- http:@//inventwithpython.com/blog/2013/04/22/multithreaded-python-tutorial-with-threadworms/
- """
- import random, pygame, sys, threading, time
- import sys
- from pygame.locals import *
- from Queue import Queue
- #the food move direction
- moveLeft = True
- moveRight = False
- moveUp = False
- moveDown = False
- MAXINT = 9999
- i =0
- # Setting up constants
- NUM_SNAKES = 3 # the number of snakes in the grid
- FPS = 30 # frames per second that the program runs
- CELL_SIZE = 40 # how many pixels wide and high each "cell" in the grid is
- CELLS_WIDE = 25 # how many cells wide the grid is
- CELLS_HIGH = 15 # how many cells high the grid is
- food_queue = Queue(1) #when the Queue is not empty,means has created the food square,
- #else havn't createdd one
- HAS_FREE = True
- FREE_CELLS =CELLS_WIDE*CELLS_HIGH #free cell num
- FOOD_L =[None,None]
- GRID = []
- for x in range(CELLS_WIDE):
- GRID.append([None] * CELLS_HIGH)
- GRID_LOCK = threading.Lock() # pun was not intended, when to set a square's color, you need lock the grid
- FOOD_LOCATION = threading.Event()
- # Constants for some colors.
- # R G B
- WHITE = (255, 255, 255)
- BLACK = ( 0, 0, 0)
- DARKGRAY = ( 40, 40, 40)
- BGCOLOR = BLACK # color to use for the background of the grid
- GRID_LINES_COLOR = DARKGRAY # color to use for the lines of the grid
- # Calculate total pixels wide and high that the full window is
- WINDOWWIDTH = CELL_SIZE * CELLS_WIDE
- WINDOWHEIGHT = CELL_SIZE * CELLS_HIGH
- UP = 'up'
- DOWN = 'down'
- LEFT = 'left'
- RIGHT = 'right'
- HEAD = 0
- BUTT = -1 # negative indexes count from the end, so -1 will always be the last index
- # A global variable that the Snake threads check to see if they should exit.
- SNAKE_RUNNING = True
- #Node will be used in findpath
- class Node():
- """in the A* algorithm, f is current distance to the target position,
- father is previous node,
- g is the distance which the snake has passed.
- curr_l is the snake's head postion (x, y)
- """
- def __init__(self, f=None, father=None, g=None, curr_l=None):
- if f is not None:
- self.f = f
- else:
- self.f = MAXINT
- if g is not None:
- self.g = g
- else:
- self.g = 0
- if father is not None:
- self.father = father
- else:
- self.father = None
- if curr_l is not None:
- self.curr_l = curr_l
- else:
- self.curr_l = None
- class Food(threading.Thread):
- """the snake eat this objet"""
- def __init__(self, color=None, maxsize=None):
- global FREE_CELLS, FOOD_L
- threading.Thread.__init__(self)
- if color == None:
- self.color = (255, 255, 255)
- else:
- self.color = color
- if maxsize == None:
- self.maxsize = 1
- else:
- self.maxsize = maxsize
- self.body = []
- GRID_LOCK.acquire()
- #random the food's (x, y) location
- while True:
- startx = random.randint(0, CELLS_WIDE-1)
- starty = random.randint(0, CELLS_HIGH-1)
- if GRID[startx][starty] is None:
- break;
- GRID[startx][starty] = self.color
- FOOD_LOCATION.set() #food position has been setted, notify the check_foodlocation func
-
- self.food_location = [ {'x':startx, 'y':starty} ]
- FOOD_L[0] = self.food_location[0]['x']
- FOOD_L[1] = self.food_location[0]['y']
- self.body.insert(0, {'x':FOOD_L[0], 'y':FOOD_L[1]})
- food_queue.put('has food') #when the food has been setted, put this word into food_queue to
- #notify other thread function
- GRID_LOCK.release()
- def new_food(self): #create a new food square if it has been eaten by single snake
- global FREE_CELLS, HAS_FREE, FOOD_L, food_queue
- GRID_LOCK.acquire()
- while True:
- startx = random.randint(0, CELLS_WIDE-1)
- starty = random.randint(0, CELLS_HIGH-1)
- if GRID[startx][starty] is None:
- break;
- print 'food', startx, starty
- GRID[startx][starty] = self.color
- FOOD_LOCATION.set()
- FREE_CELLS -= 1
- if FREE_CELLS == 0:
- HAS_FREE = False #when all the gird has the color the produce need to be stopped
- self.food_location = [ {'x':startx, 'y':starty} ]
- FOOD_L[0] = self.food_location[0]['x']
- FOOD_L[1] = self.food_location[0]['y']
- food_queue.put('has food')
- print 'new_food at [ %s , %s ]' %(FOOD_L[0], FOOD_L[1])
- GRID_LOCK.release()
- def run(self):
- global SNAKE_RUNNING
- while True:
- if not SNAKE_RUNNING:
- print 'snakes terminate'
- return
- if food_queue.empty():
- if HAS_FREE == True: # still some grid without color, create one
- self.new_food()
- else:
- SNAKE_RUNNING = False
- time.sleep(0.01)
-
- class Snake(threading.Thread): # "Thread" is a class in the "threading" module.
- def __init__(self, name='Snake', thread_name=None, maxsize=None, color=None, speed=None):
- # name can be used for debugging purposes. It will appear in any thrown exceptions so you can tell which thread crashed.
- # maxsize is the length of the snake (in body segments).
- # color is an RGB tuple for the snake. The darker shade is automatically calculated.
- # speed is an integer of milliseconds the snake waits after moving once. 1000=move once a second, 0=move as fast as possible
- global FREE_CELLS, FOOD_L
- threading.Thread.__init__(self) # since we are overriding the Thread class, we need to first call its __init__() method.
- self.name = name
- self.thread_name = thread_name
- self.maxsize = 1
- self.OPEN = {}
- self.CLOSE = {}
- self.path = []
- self.thread_name = thread_name
- # Set the color to the parameter, or to a random color.
- if color is None:
- self.color = (random.randint(60, 255), random.randint(60, 255), random.randint(60, 255))
- else:
- self.color = color
- # Set the speed to the parameter, or to a random number.
- if speed is None:
- self.speed = random.randint(20, 50) # wait time before movements will be between 0.2 and 0.5 seconds
- else:
- self.speed = speed
- GRID_LOCK.acquire() # block until this thread can acquire the lock
- while True:
- startx = random.randint(0, CELLS_WIDE - 1)
- starty = random.randint(0, CELLS_HIGH - 1)
- if GRID[startx][starty] is None:
- break # we've found an unoccupied cell in the grid
- print 'snake [ %s ] start at (%s, %s)' %(self.thread_name, startx, starty)
- GRID[startx][starty] = self.color # modify the shared data structure
- FREE_CELLS = FREE_CELLS - self.maxsize
- GRID_LOCK.release()
- # The snake's body starts as a single segment, and keeps growing until it
- # reaches full length. This makes setup easier.
- self.body = [{'x': startx, 'y': starty}]
- self.direction = random.choice((UP, DOWN, LEFT, RIGHT))
- self.start_l = (startx, starty)
- self.end_l = None
- self.nextx = 999
- self.nexty = 999
- def run(self):
- global FOOD_L
- isfirst = True
- isrun = False
- old_start_l = self.start_l
- old_end_l = self.end_l
- chthread_run = False
- while True:
- if chthread_run == False:
- t = threading.Thread(target=self.check_foodlocation) #start another thread to check whether the
- #food location has been changed (use A* algorithm)
- t.start()
- chthread_run = True
- if not SNAKE_RUNNING:
- return # A thread terminates when run() returns.
- # Randomly decide to change direction
- #if random.randint(0, 100) < 20: # 20% to change direction
- # self.direction = random.choice((UP, DOWN, LEFT, RIGHT))
-
- GRID_LOCK.acquire()
- if food_queue.empty():
- GRID_LOCK.release() #release the grid lock and the let the food thread
- #has access to get it and create a food on the free grid
- time.sleep(0.01)
- else:
- start_l = (self.body[0]['x'], self.body[0]['y'])
- end_l = (FOOD_L[0], FOOD_L[1])
- self.start_l = start_l
- self.end_l = end_l
- old_start_l = start_l
- old_end_l = end_l
- print "thread [ %s ] start_l = %s" %(self.thread_name, str(start_l))
- print "thread [ %s ] end_l = %s" %(self.thread_name, str(end_l))
- if isfirst == True:
- self.end_l = end_l
- self.OPEN.clear()
- self.CLOSE.clear()
- del self.path[:]
- result = self.find_path(start_l, end_l)
- for path in self.path:
- self.reverse_time = 0
- print path,
- if result == -1: #can't find the path to reache the food location
- if self.reverse_time == 0:
- self.body.reverse() #reverse the snake body(but it may be wrong when
- self.reverse_time = 1 #there is only a litte free grid left.
- isfirst = False
- if self.end_l != end_l: #the food location has been changed
- FOOD_LOCATION.set()
- GRID_LOCK.release()
- # time.sleep(0.01)
- continue
-
- if len(self.path) > 1:
- next = self.path[1]
- self.nextx = next[0]
- self.nexty = next[1]
- if len(self.path) == 1:
- next = self.path[0]
- self.nextx = next[0]
- self.nexty = next[1]
-
- if GRID[self.nextx][self.nexty] is not None:
- if GRID[self.nextx][self.nexty] != WHITE: #the location which the snake wanna move
- #is in the other's snake's body
- #WHITE means food
- print (self.nextx, self.nexty)
- print GRID[self.nextx][self.nexty]
- print self.thread_name, " it's not None, caculate again", (self.nextx, self.nexty)
- FOOD_LOCATION.set() #update the path, the older one can't pass
- GRID_LOCK.release()
- time.sleep(0.01)
- continue
- else:
- print 'get the food'
- self.maxsize += 1
- self.OPEN.clear()
- self.CLOSE.clear()
- food_queue.get() #the food has beend eaten by this snake thread
- if len(self.path) > 0: #self.paht record the path to the food location
- #print 'delete self.path[0]' ,self.path[0]
- self.path.pop(0) #delete head
-
- GRID[self.nextx][self.nexty] = self.color # update the GRID state
- self.body.insert(0, {'x': self.nextx, 'y': self.nexty}) # update this snake's own state
- self.start_l = (self.nextx, self.nexty)
- if food_queue.empty():
- del self.path[:]
- # Check if we've grown too long, and cut off tail if we have.
- # This gives the illusion of the snake moving.
- if len(self.body) > self.maxsize:
- GRID[self.body[BUTT]['x']][self.body[BUTT]['y']] = None # update the GRID state
- del self.body[BUTT] #it's like the snake has moved to the next position
- pygame.time.wait(self.speed)
- GRID_LOCK.release()
- time.sleep(0.01)
- def check_foodlocation(self): #check if food position has changed
- while True:
- global food_queue, FOOD_L, FOOD_LOCATION
- FOOD_LOCATION.wait() #all the snake check path thread activate by the FOOD_LOCATION, when
- #the new food event come up
- FOOD_LOCATION.clear()
- if food_queue.full() and self.end_l is not None:
- GRID_LOCK.acquire() #when to calaute the path again, you need the grid to
- #lock all the grid first
- self.OPEN.clear()
- self.CLOSE.clear()
- del self.path[:]
- self.end_l = (FOOD_L[0], FOOD_L[1])
- print 'the food position has been changed to [%s, %s]' %(FOOD_L[0], FOOD_L[1])
- result = self.find_path(self.start_l, self.end_l) #find the path to food again
- if result == -1:
- self.body.reverse()
- GRID_LOCK.release()
-
- def find_path(self, start_l, end_l): #use the A* algorithm
- node = Node()
- node.curr_l = start_l
- node.father = None
- self.OPEN[start_l] = node
- isfirst = True
- while True:
- if isfirst == True:
- dict_item = self.OPEN.pop(start_l, None)
- if dict_item is not None:
- self.CLOSE[start_l] = dict_item
- self.isaround_ok(start_l, end_l)
- else:
- print 'OPEN is None'
- isfirst = False
- else:
- if len(self.OPEN) == 0:
- print "can't find distination"
- return -1
- curr_l = self.find_min()
- #print curr_l,'************************************'
- dict_item = self.OPEN.pop(curr_l, None)
- if dict_item is not None:
- self.CLOSE[curr_l] = dict_item
- if curr_l == end_l:
- print 'ok find it'
- self.get_path(curr_l)
- return 0
- else:
- self.isaround_ok(curr_l, end_l)
- def isaround_ok(self, start_l, end_l): # to find the next position, but it must in
- # the range of grid, and must not in CLOSE dict
- # any details please see the principle of A star algorithm
- start_x = start_l[0]
- start_y = start_l[1]
- end_x = end_l[0]
- end_y = end_l[1]
-
- father = self.CLOSE[start_l]
- location = []
- if start_x-1 != -1:
- location.append( (start_x-1, start_y) )
-
- if start_x+1 != CELLS_WIDE:
- location.append( (start_x+1, start_y) )
- if start_y-1 != -1:
- location.append( (start_x, start_y-1) )
-
- if start_y+1 != CELLS_HIGH:
- location.append( (start_x, start_y+1) )
- if len(location) == 0:
- return;
-
- for curr_l in location:
- x = curr_l[0]
- y = curr_l[1]
- #print (x, y)
- if curr_l not in self.CLOSE.keys() and (GRID[x][y] == WHITE or GRID[x][y] == None):
- if father is not None:
- g = father.g+1
- else:
- g = 1
- h = self.judge_dis(curr_l, end_l)
- f_new = g+h
- if curr_l in self.OPEN.keys():
- if f_new < father.f:
- self.OPEN[curr_l].father = father
- self.OPEN[curr_l].f = f_new
- self.OPEN[curr_l].g = g
- #not in OPEN[]
- else:
- node = Node()
- node.curr_l = curr_l
- node.g = g
- node.father = father
- node.f = f_new
- self.OPEN[curr_l] = node
- def find_min(self):
- min = 9999
- key = None
- for eachkey in self.OPEN.keys():
- f = self.OPEN[eachkey].f
- if f<min:
- key = eachkey
- min = f
- return key
- def judge_dis(self, start_l, end_l):
- start_x = start_l[0]
- start_y = start_l[1]
- end_x = end_l[0]
- end_y = end_l[1]
- h = abs(end_x -start_x) + abs(end_y -start_y)
- return h
-
- def get_path(self, curr_l):
- self.path.append(curr_l)
- father = self.CLOSE[curr_l].father
- while father is not None:
- curr_l = father.curr_l
- self.path.append(curr_l)
- father = self.CLOSE[curr_l].father
- self.path.reverse()
- def main():
- global FPSCLOCK, DISPLAYSURF, FREE_CELLS
- # Draw some walls on the grid
- # Pygame window set up.
- pygame.init()
- FPSCLOCK = pygame.time.Clock()
- DISPLAYSURF = pygame.display.set_mode((WINDOWWIDTH, WINDOWHEIGHT))
- pygame.display.set_caption('Threadsnake')
- # Create the snake objects.
- snakes = [] # a list that contains all the snake objects
- j = 0;
- for i in range(NUM_SNAKES):
- j += 1
- snakes.append(Snake('snake', j, None, None, None) )
- snakes[-1].start() # Start the snake code in its own thread.
-
- #To creat the food(the white square is the food)
- food = Food()
- food.start()
- while True: # main game loop
- handleEvents()
- drawGrid()
- pygame.display.update()
- FPSCLOCK.tick(FPS)
- def handleEvents():
- # The only event we need to handle in this program is when it terminates.
- global SNAKE_RUNNING, moveLeft, moveRight, moveUp, moveDown
- for event in pygame.event.get(): # event handling loop
- if event.type == QUIT:
- SNAKE_RUNNING = False # Setting this to False tells the Snake threads to exit.
- pygame.quit()
- sys.exit()
- if event.type == KEYDOWN:
- print 'Kyedown !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!'
- if event.key == K_ESCAPE:
- SNAKE_RUNNING = False # Setting this to False tells the Snake threads to exit.
- pygame.quit()
- sys.exit()
- if event.key == K_LEFT and moveRight==False:
- moveLeft = True
- moveRight = False
- moveUp = False
- moveDown = False
- if event.key == K_RIGHT and moveLeft==False:
- moveLeft = False
- moveRight = True
- moveUp = False
- moveDown = False
- if event.key == K_UP and moveDown==False:
- moveLeft = False
- moveRight = False
- moveUp = True
- moveDown = False
- if event.key == K_DOWN and moveUp==False:
- moveLeft = False
- moveRight = False
- moveUp = False
- moveDown = True
-
- def drawGrid():
- # Draw the grid lines.
- DISPLAYSURF.fill(BGCOLOR)
- for x in range(0, WINDOWWIDTH, CELL_SIZE): # draw vertical lines
- pygame.draw.line(DISPLAYSURF, GRID_LINES_COLOR, (x, 0), (x, WINDOWHEIGHT))
- for y in range(0, WINDOWHEIGHT, CELL_SIZE): # draw horizontal lines
- pygame.draw.line(DISPLAYSURF, GRID_LINES_COLOR, (0, y), (WINDOWWIDTH, y))
- # The main thread that stays in the main loop (which calls drawGrid) also
- # needs to acquire the GRID_LOCK lock before modifying the GRID variable.
- GRID_LOCK.acquire()
- for x in range(0, CELLS_WIDE):
- for y in range(0, CELLS_HIGH):
- if GRID[x][y] is None:
- continue # No body segment at this cell to draw, so skip it
- color = GRID[x][y] # modify the GRID data structure
- # Draw the body segment on the screen
- darkerColor = (max(color[0] - 50, 0), max(color[1] - 50, 0), max(color[2] - 50, 0))
- pygame.draw.rect(DISPLAYSURF, darkerColor, (x * CELL_SIZE, y * CELL_SIZE, CELL_SIZE, CELL_SIZE ))
- pygame.draw.rect(DISPLAYSURF, color, (x * CELL_SIZE + 4, y * CELL_SIZE + 4, CELL_SIZE - 8, CELL_SIZE - 8))
- GRID_LOCK.release() # We're done messing with GRID, so release the lock.
- def setGridSquares(squares, color=(192, 192, 192)):
- squares = squares.split('\n')
- if squares[0] == '':
- del squares[0]
- if squares[-1] == '':
- del squares[-1]
- GRID_LOCK.acquire()
- for y in range(min(len(squares), CELLS_HIGH)):
- for x in range(min(len(squares[y]), CELLS_WIDE)):
- if squares[y][x] == ' ':
- GRID[x][y] = None
- elif squares[y][x] == '.':
- pass
- else:
- GRID[x][y] = color
- GRID_LOCK.release()
- if __name__ == '__main__':
- main()
复制代码 |
|