Monday, March 31, 2014

some basic python syntax

1) list
#empty list
a = []
print a

#list with data
a= [1, 2, 3]
print a

#deep copy and shallow copy
a = [1, 2, 3]
print id(a), a

#shallow copy, only copy the pointer of the list
b = a
print id(b), b

#deep copy, copy the data of the list
c = list(a)
print id(c), c

a[0] = 2
print id(a), a
print id(b), b
print id(c), c

#deep copy
d = a[:]
print id(d)

2) argument
#e.g.: python argument.py -f file1 --cov "code coverage" -q arg1 arg2
from optparse import OptionParser
import sys

usage = "usage: %prog [options] arg1 arg2"
parser = OptionParser(usage=usage)
parser.add_option(
    "-f", "--file", dest="filename", help="write report to FILE", metavar="FILE"
)
parser.add_option(
    "--cov", type="string", dest="run_with_cov", default=False, help="coverage option"
)
parser.add_option(
    "-q", "--quiet", action="store_false", dest="verbose", default=True, help="don't print status messages to stdout"
)

(options, args) = parser.parse_args()

print len(args)
print args

for opt, value in options.__dict__.items():
    print opt, ":" , value

for option in vars(options):
    value = getattr(options, option)
    print option, ":" , value

if options.filename:
    print "filename is %s..." % options.filename

3) while
import re
while True:
    # Get user input
    print "Enter the name of folder in the current folder:"
    folder1 = raw_input("folder = ")

    #remove space in head and tail
    folder = re.sub(r"^\s+", "", folder)
    folder = re.sub(r"\s+$", "", folder)

    # check if folder1 is valid sub-folders within the current folder
    if not exists(folder):
        print "folder", folder, "doesn't exist\n"
        continue

    if not isdir(folder):
        print "folder", folder, "is not a directory\n"
        continue
 
    #if there is no problem, break the while loop
    break

4) stdin
import os

try:
    input = raw_input
except NameError:
    pass

filename = "";
while not os.path.exists(filename):
    filename = input("Enter your filename: ")

5) remove space from head and tail of a line and remove line break
import re

with open(filename, 'rb') as fh:
    for line in fh:
        #line = line.replace('\n', '')
        line = re.sub("[\n\r]", '', line)

        # line = re.sub('(?:^\s*)|(?:\s*$)', '', line)
        line = re.sub('^\s*|\s*$', '', line)
        print line

6) dictionary
#loop the dictionary
for k in nodes:
    print(k, nodes[k])

for k in nodes.keys():
    print(k, nodes[k])

for k in nodes.iterkeys():
    print(k, nodes[k])

for v in nodes.itervalues():
    print v

for k, v in nodes.iteritems():
    print(k, v)

# dictionary of list
import re
import collections
nodes = collections.defaultdict(list)
with open(filename, 'rb') as fh:
    for line in fh:
        elements = re.split('\s+', line)
        if len(elements) != 2:
            continue
        nodes[elements[0]].append(elements[1])

# dictionary of list
import re
nodes = {}
with open(filename, 'rb') as fh:
    for line in fh:
        elements = re.split('\s+', line)
        if len(elements) != 2:
            continue
        if nodes.has_key(elements[0]):
            nodes[elements[0]].append(elements[1])
        else:
            nodes[elements[0]] = []
            nodes[elements[0]].append(elements[1])

7) file operation
#open file
with open(filename, 'rb') as fh:
    for line in fh:
        print line

8) import module
from numpy import sqrt
result = sqrt(2)
print result

from numpy import *
result = sqrt(2)
print result

import numpy
result = numpy.sqrt(2)
print result

import numpy as np
result = np.sqrt(2)
print result

9) use module numpy to operate on array
import numpy
#numpy.sqrt(a) result is the same as that of numpy.sqrt(a_arr)

a = [1, 2, 3]
print a, a*2, numpy.sqrt(a)
#result: [1, 2, 3] [1, 2, 3, 1, 2, 3] [ 1.          1.41421356  1.73205081]

a_arr = numpy.array(a)
print a_arr, a_arr*2, numpy.sqrt(a_arr)
#result: [1 2 3] [2 4 6] [ 1.          1.41421356  1.73205081]

10) python module path
#package test/test_module_path.py
def print_msg():
    print "test PYTHONPATH and sys.path"

#script
from test_module_path import *
print_msg()

Error will occur as directory test is not included in python
Two ways to solve the issue:
    i) add module path to environmental variable PYTHONPATH:
       export PYTHONPATH=./test

    ii) append the directory test to variable sys.path in script:
        import sys
        sys.path.append("./test")
        from test_module_path import *
        print_msg()

11) use execfile to execute an external script
#script display.py
print "test execfile"

#main script
execfile("display.py")

12) variable __name__
if __name__ == "__main__":
    print "executing"

#the code in "if __name__" will not be executed when the python file is imported as a module while it will be executed when the python file is executed as a script

13) function arguments
#!/usr/bin/python

def func_arg1(*args):
    for i in args:
        print i

def func_arg2(**args):
    for i in args:
        print i, args[i]

func_arg1(1, 2, "3", "4")
func_arg2(name=1, nation="2")

101) python OOP
##################################################################
#MyClass.py
##################################################################
import turtle
turtle.up()
import math
import random

# Point class definition
class Point:
    def __init__(self, x_coord=0, y_coord=0):
        self.x = x_coord; self.y = y_coord
 
    # Access methods  
    def set_x(self, new_x): self.x = new_x
    def get_xy(self): return (self.x, self.y)
 
    def set_y(self, new_y): self.y = new_y
    def set_xy(self, new_x, new_y):
        self.x = new_x
        self.y = new_y
    def get_x(self): return self.x
    def get_y(self): return self.y

    # overloading the addition (+) operator
    def __add__(self, other_point):
        added_x = self.x + other_point.x
        added_y = self.y + other_point.y
        added_pt = Point(added_x, added_y)
        return added_pt

    def __eq__(self, other_point):
        if (self.x == other_point.x) and (self.y == other_point.y):
            return True
        else:
            return False
 
    def dist(self, other_point):
        return math.sqrt(math.pow((self.x - other_point.x), 2) + math.pow((self.y - other_point.y), 2))
 
   # overloading the str() method
    def __str__(self):
        my_attribs_and_values = ""
        for key, value in sorted(self.__dict__.items()): # loop through dict
            if not key[0] == "_": # if name does not start with _
                my_attribs_and_values += str(key) + ":" + str(value) + " "
        return my_attribs_and_values # return assembled string for printing

    def draw(self):
        turtle.setup(601, 601, -300, -300)
        turtle.goto(self.x, self.y)
        turtle.dot()
##################################################################
# Polygon class
class Polygon:
    def __init__(self, points=[]):
        print "creating an instance of class", self.__class__.__name__
        self.point_list = points[:]
     
    def draw(self):
        turtle.setup(601, 601, -300, -300)
        turtle.penup()
        for p in self.point_list:
            p.draw()
            turtle.pendown()
        self.point_list[0].draw()
     
    def num_points(self):
        return len(point_list)
 
    def length(self):
        total_len = 0
        #calculate the lenght for all sides
        for i in range(0, len(self.point_list)):
            if(i < len(self.point_list)-1):
                total_len += self.point_list[i].dist(self.point_list[i+1])
            else:
                total_len += self.point_list[i].dist(self.point_list[0])
        return total_len

##################################################################
#Rectangle
class Rectangle(Polygon):
    def __init__(self, lower_left_corner, width, height):
        self.lower_left_corner = lower_left_corner
        self.width = width
        self.height = height
        #get the four points of the rectangle
        plst = [lower_left_corner,                              
                lower_left_corner+Point(self.width, 0),          
                lower_left_corner+Point(self.width, self.height),
                lower_left_corner+Point(0, self.height)]          
        #call Polygon init function to instance a polygon (rectangle)
        Polygon.__init__(self, plst)

##################################################################
# Square 
class Square(Polygon):
    def __init__(self, lower_left_corner=Point(0,0), side_length = 0):
        ll = lower_left_corner
        plst = [ll,                                
                ll+Point(side_length, 0),          
                ll+Point(side_length, side_length),
                ll+Point(0, side_length)]          
        Polygon.__init__(self, plst)
    #do NOT define a draw() method for the Square class
    #use the draw() method from class Polygon directly

##################################################################
# ArtzyRectangle
class ArtzyRectangle(Rectangle):
    def __init__(self, rec):
        #call Rectangle init function to instance a rectangle
        Rectangle.__init__(self, rec.lower_left_corner, rec.width, rec.height)

    #overload draw() method
    def draw(self):
        #get the width of the line of the rectangle
        line_width = random.randint(1, 5)
        turtle.width(line_width)

        #get the color that is used to fill the rectangle
        color = "#"
        for i in range(3):
            color_hex = random.randint(0, 256)
            strHex = "%0.2X" % color_hex
            color += strHex

        #set the pen type according to the line width and filled color obtained above
        turtle.pen(fillcolor=color, pensize=line_width)

        #begin to enable fill rectangle
        turtle.begin_fill()

        turtle.penup()
        for p in self.point_list:
            p.draw()
            turtle.pendown()
        # go back to first point to close the polygon
        self.point_list[0].draw()

        #begin to disable fill rectangle
        turtle.end_fill()

################################################################## 
#test.py
################################################################## 
from MyClasses import Point   
from MyClasses import Polygon
from MyClasses import Square
from MyClasses import Rectangle
from MyClasses import RandomRectangle

sq = Square(Point(-100, -20), 50)
sq.draw()

art_rec = ArtzyRectangle(sq)
art_rec.draw()

102) walk
from os import walk
duplicate_files = []
file_list1 = []
file_list2 = []

#put each file in folder1 into list file_list1
for (dirpath, dirnames, filenames) in walk(folder1):
    file_list1 += filenames

#put each file in folder2 into list file_list2
for (dirpath, dirnames, filenames) in walk(folder2):
    file_list2 += filenames

#foreach file in file_list1
for f1 in file_list1:
    #foreach file in file_list2
    for f2 in file_list2:
        #if filename is the same, put it into the duplicate list
        if f1 == f2:
            duplicate_files.append(f1)

103) get total size of all files in one folder
from os.path import getsize
from os import walk
total_size_in_bytes = 0
for (dirpath, dirnames, filenames) in walk(folder):
    for f in filenames:
        total_size_in_bytes += getsize(dirpath + "/" + f)

104) glob
import glob
files_in_folder = glob.glob(folder + '/*')
print "found", files_in_folder, "files in folder", folder

105) read csv file
import csv
x = []
y = []
first_line = 1

#open the csv file
with open('filename.csv', 'rb') as csvfile:
    #give the read handler to csvreader
    csvreader = csv.reader(csvfile)
    #foreach row in the csv file
    for row in csvreader:
        #if it is not the first line
        if first_line == 0:
            x.append(row[1])
            y.append(row[2])
        #if it is the first line, skip this line
        else:
            first_line = 0

106) counting sort
#!/usr/bin/python

import collections

data = [4, 2, 1, 3, 7, 2, 9, 8, 1, 5, 6, 9, 1, 3, 2, 7]
data_cnt = collections.defaultdict(list)
for i in data:
    data_cnt[i].append(i)

sorted_data = []
for i in data_cnt:
    sorted_data.extend(data_cnt[i])

print sorted_data

110) thread: thread without arg
#!/usr/bin/python

import thread
from time import sleep, ctime

def loop0():
    print 'start loop 0 at:', ctime()
    sleep(4)
    print 'loop 0 done at:', ctime()

def loop1():
    print 'start loop 1 at:', ctime()
    sleep(2)
    print 'loop 1 done at:', ctime()

def main():
    print 'starting at:', ctime()
    thread.start_new_thread(loop0, ())
    thread.start_new_thread(loop1, ())
    sleep(6)
    print 'all DONE at:', ctime()

if __name__ == '__main__':
    main()

111) thread: thread with arg
#!/usr/bin/python

import thread
from time import sleep, ctime

loops = [4, 2]

def loop(nloop, nsec, lock):
    print 'start loop', nloop, 'at:', ctime()
    sleep(nsec)
    print 'loop', nloop, 'done at:', ctime()
    lock.release()

def main():
    print 'starting at:', ctime()
    locks = []
    nloops = range(len(loops))

    for i in nloops:
        lock = thread.allocate_lock()
        lock.acquire()
        locks.append(lock)

    for i in nloops:
        thread.start_new_thread(loop, (i, loops[i], locks[i]))

    for i in nloops:
        while locks[i].locked(): pass

    print 'all DONE at:', ctime()

if __name__ == '__main__':
    main()

112) threading.thread
#!/usr/bin/python

import threading
from time import sleep, ctime

loops = [4, 2]

def loop(nloop, nsec):
    print 'start loop', nloop, 'at:', ctime()
    sleep(nsec)
    print 'loop', nloop, 'done at:', ctime()

def main():
    print 'starting at:', ctime()
    threads = []
    nloops = range(len(loops))

    for i in nloops:
        t = threading.Thread(target=loop, args=(i, loops[i]))
        threads.append(t)

    for i in nloops:
        threads[i].start()

    for i in nloops:
        threads[i].join()

    print 'all DONE at:', ctime()

if __name__ == '__main__':
    main()

113) threading.thread: use object of class as thread
#!/usr/bin/python

import threading
from time import sleep, ctime

loops = [4, 2]

class ThreadFunc(object):
    def __init__(self, func, args, name=''):
        self.name = name
        self.func = func
        self.args = args

    def __call__(self):
        apply(self.func, self.args)

def loop(nloop, nsec):
    print 'start loop', nloop, 'at:', ctime()
    sleep(nsec)
    print 'loop', nloop, 'done at:', ctime()

def main():
    print 'starting at:', ctime()
    threads = []
    nloops = range(len(loops))

    for i in nloops:
        t = threading.Thread(target=ThreadFunc(loop, (i, loops[i]), loop.__name__))
        threads.append(t)

    for i in nloops:
        threads[i].start()

    for i in nloops:
        threads[i].join()

    print 'all DONE at:', ctime()

if __name__ == '__main__':
    main()

114) extend class threading.Thread
#!/usr/bin/python

import threading
from time import sleep, ctime

loops = [4, 2]

class MyThread(threading.Thread):
    def __init__(self, func, args, name=''):
        threading.Thread.__init__(self)
        self.name = name
        self.func = func
        self.args = args

    def run(self):
        apply(self.func, self.args)

def loop(nloop, nsec):
    print 'start loop', nloop, 'at:', ctime()
    sleep(nsec)
    print 'loop', nloop, 'done at:', ctime()

def main():
    print 'starting at:', ctime()
    threads = []
    nloops = range(len(loops))

    for i in nloops:
        t = MyThread(loop, (i, loops[i]), loop.__name__)
        threads.append(t)

    for i in nloops:
        threads[i].start()

    for i in nloops:
        threads[i].join()

    print 'all DONE at:', ctime()

if __name__ == '__main__':
    main()

115) extend class threading.Thread
#!/usr/bin/python

import threading
from time import sleep, ctime

loops = [4, 2]

class MyThread(threading.Thread):
    def __init__(self, func, args, name=''):
        threading.Thread.__init__(self)
        self.name = name
        self.func = func
        self.args = args

    def getResult(self):
        return self.res

    def run(self):
        print 'starting', self.name, 'at:', ctime()
        self.res = apply(self.func, self.args)
        print self.name, 'finished at:', ctime()

def loop(nloop, nsec):
    print 'start loop', nloop, 'at:', ctime()
    sleep(nsec)
    print 'loop', nloop, 'done at:', ctime()

def main():
    print 'starting at:', ctime()
    threads = []
    nloops = range(len(loops))

    for i in nloops:
        t = MyThread(loop, (i, loops[i]), loop.__name__)
        threads.append(t)

    for i in nloops:
        threads[i].start()

    for i in nloops:
        threads[i].join()

    print 'all DONE at:', ctime()

if __name__ == '__main__':
    main()

116) threading
##################################################################
#MyThread.py
##################################################################
#!/usr/bin/python

import threading
from time import sleep, ctime

loops = [4, 2]

class MyThread(threading.Thread):
    def __init__(self, func, args, name=''):
        threading.Thread.__init__(self)
        self.name = name
        self.func = func
        self.args = args

    def getResult(self):
        return self.res

    def run(self):
        print 'starting', self.name, 'at:', ctime()
        self.res = apply(self.func, self.args)
        print self.name, 'finished at:', ctime()

def loop(nloop, nsec):
    print 'start loop', nloop, 'at:', ctime()
    sleep(nsec)
    print 'loop', nloop, 'done at:', ctime()

def main():
    print 'starting at:', ctime()
    threads = []
    nloops = range(len(loops))

    for i in nloops:
        t = MyThread(loop, (i, loops[i]), loop.__name__)
        threads.append(t)

    for i in nloops:
        threads[i].start()

    for i in nloops:
        threads[i].join()

    print 'all DONE at:', ctime()

if __name__ == '__main__':
    main()

##################################################################
#threading_test.py
##################################################################
#!/usr/bin/python

from myThread import MyThread
from time import sleep, ctime

loops = [4, 2]

def fib(x):
    sleep(0.005)
    if x < 2: return 1
    return (fib(x-2) + fib(x-1))

def fac(x):
    sleep(0.1)
    if x < 2: return 1
    return (x * fac(x-1))

def sum(x):
    sleep(0.1)
    if x < 2: return 1
    return (x + sum(x-1))

funcs = [fib, fac, sum]
n = 12

def main():
    nfuncs = range(len(funcs))

    print '*** SINGLE THREAD'
    for i in nfuncs:
        print 'starting', funcs[i].__name__, 'at:', ctime
        print funcs[i](n)
        print funcs[i].__name__, 'finished at:', ctime()

    print '\n*** MULTIPLE THREADS'
    threads = []
    for i in nfuncs:
        t = MyThread(funcs[i], (n,), funcs[i].__name__)
        threads.append(t)

    for i in nfuncs:
        threads[i].start()

    for i in nfuncs:
        threads[i].join()
        print threads[i].getResult()

    print 'all DONE'

if __name__ == '__main__':
    main()

117) threading
##################################################################
#MyThread.py
##################################################################
#!/usr/bin/python

import threading
from time import sleep, ctime

loops = [4, 2]

class MyThread(threading.Thread):
    def __init__(self, func, args, name=''):
        threading.Thread.__init__(self)
        self.name = name
        self.func = func
        self.args = args

    def getResult(self):
        return self.res

    def run(self):
        print 'starting', self.name, 'at:', ctime()
        self.res = apply(self.func, self.args)
        print self.name, 'finished at:', ctime()

def loop(nloop, nsec):
    print 'start loop', nloop, 'at:', ctime()
    sleep(nsec)
    print 'loop', nloop, 'done at:', ctime()

def main():
    print 'starting at:', ctime()
    threads = []
    nloops = range(len(loops))

    for i in nloops:
        t = MyThread(loop, (i, loops[i]), loop.__name__)
        threads.append(t)

    for i in nloops:
        threads[i].start()

    for i in nloops:
        threads[i].join()

    print 'all DONE at:', ctime()

if __name__ == '__main__':
    main()

##################################################################
#threading_test.py
##################################################################
#!/usr/bin/python

from random import randint
from time import sleep, ctime
from Queue import Queue
from myThread import MyThread

def writeQ(queue):
    print 'producint object for Q...', queue.put('XXX', 1)
    print "size now", queue.qsize()

def readQ(queue):
    var = queue.get(1)
    print 'consumed object from Q... size now', queue.qsize()

def writer(queue, loops):
    for i in range(loops):
        writeQ(queue)
        sleep(randint(1, 3))

def reader(queue, loops):
    for i in range(loops):
        readQ(queue)
        sleep(randint(2, 5))

funcs = [writer, reader]
nfuncs = range(len(funcs))

def main():
    nloops = randint(2, 5)
    q = Queue(32)

    threads = []
    for i in nfuncs:
        t = MyThread(funcs[i], (q, nloops), funcs[i].__name__)
        threads.append(t)

    for i in nfuncs:
        threads[i].start()

    for i in nfuncs:
        threads[i].join()

    print 'all DONE'

if __name__ == '__main__':
    main()

118) threading.RLock
#!/usr/bin/python

import random
from time import sleep, ctime
import threading

class data:
    def __init__(self):
        self.__lock = threading.RLock()

    def test1(self, msg):
        sleep(2)
        print msg, ' test1_1', ctime()
        with self.__lock:
            sleep(2)
            print msg, ' test1_2', ctime()

    def test2(self, msg):
        sleep(2)
        print msg, ' test2_1', ctime()
        with self.__lock:
            sleep(2)
            print ctime()
            print msg, ' test2_2', ctime()

data_obj = data()

threads = []
t1 = threading.Thread(target=data_obj.test1, args=(["test1"]))
threads.append(t1)
t2 = threading.Thread(target=data_obj.test2, args=(["test2"]))
threads.append(t2)

for i in range(2):
    threads[i].start()

for i in range(2):
    threads[i].join()

119) threading.Timer
#!/usr/bin/python

import random
import time
import threading

def print_msg(msg):
    print msg, " test1 ", time.ctime()

class data(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self._timer = threading.Timer

    def run(self):
        t = self._timer(1, self.print_msg, ["test"])
        t.start()

    def print_msg(self, msg):
        print msg, " test2 ", time.ctime()

t = threading.Timer(1, print_msg, ["test"])
t.start()

data_obj = data()
data_obj.start()

time.sleep(3)

No comments:

Post a Comment