#
# Copyright (c) 2017, Stephanie Wehner and Axel Dahlberg
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# 3. All advertising materials mentioning features or use of this software
# must display the following acknowledgement:
# This product includes software developed by Stephanie Wehner, QuTech.
# 4. Neither the name of the QuTech organization nor the
# names of its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDER ''AS IS'' AND ANY
# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER BE LIABLE FOR ANY
# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import socket, struct, os, sys, time, math
from SimulaQron.general.hostConfig import *
from SimulaQron.cqc.backend.cqcHeader import *
from SimulaQron.cqc.backend.entInfoHeader import *
[docs]class CQCConnection:
_appIDs=[]
def __init__(self,name,cqcFile=None,appFile=None,appID=0):
"""
Initialize a connection to the cqc server.
- **Arguments**
:name: Name of the host.
:cqcFile: Path to cqcFile. If None, '$NETSIM/config/cqcNodes.cfg is used.
:appFile: Path to appFile. If None, '$NETSIM/config/appNodes.cfg is used.
:appID: Application ID, defaults to a nonused ID.
"""
# Host name
self.name=name
# Which appID
if appID in self._appIDs:
raise ValueError("appID={} is already in use".format(appID))
self._appID=appID
self._appIDs.append(self._appID)
# Buffer received data
self.buf=None
# ClassicalServer
self._classicalServer=None
# Classical connections in the application network
self._classicalConn={}
# This file defines the network of CQC servers interfacing to virtual quantum nodes
if cqcFile==None:
self.cqcFile = os.environ.get('NETSIM') + "/config/cqcNodes.cfg"
# Read configuration files for the cqc network
self._cqcNet = networkConfig(self.cqcFile)
# Host data
if self.name in self._cqcNet.hostDict:
myHost = self._cqcNet.hostDict[self.name]
else:
raise ValueError("Host name '{}' is not in the cqc network".format(name))
#Get IP of correct form
myIP=socket.inet_ntoa(struct.pack("!L",myHost.ip))
#Connect to cqc server
self._s=None
try:
self._s=socket.socket(socket.AF_INET,socket.SOCK_STREAM,0)
except socket.error:
raise RuntimeError("Could not connect to cqc server '{}'".format(name))
try:
self._s.connect((myIP,myHost.port))
except socket.error:
self._s.close()
raise RuntimeError("Could not connect to cqc server '{}'".format(name))
# This file defines the application network
if appFile==None:
self.appFile = os.environ.get('NETSIM') + "/config/appNodes.cfg"
# Read configuration files for the application network
self._appNet = networkConfig(self.appFile)
def __str__(self):
return "Socket to cqc server '{}'".format(self.name)
[docs] def get_appID(self):
"""
Returns the application ID.
"""
return self._appID
[docs] def close(self):
"""
Closes the connection.
"""
self._s.close()
self._appIDs.remove(self._appID)
self.closeClassicalServer()
for name in list(self._classicalConn):
self.closeClassicalChannel(name)
[docs] def startClassicalServer(self):
"""
Sets up a server for the application communication, if not already set up.
"""
if not self._classicalServer:
#Get host data
myHost=self._appNet.hostDict[self.name]
# Setup server
s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.bind((myHost.hostname,myHost.port))
s.listen(1)
(conn,addr)=s.accept()
self._classicalServer=conn
[docs] def closeClassicalServer(self):
if self._classicalServer:
self._classicalServer.close()
self._classicalServer=None
[docs] def recvClassical(self,timout=1, msg_size=1024):
if not self._classicalServer:
self.startClassicalServer()
for _ in range(10*timout):
msg=self._classicalServer.recv(msg_size)
if len(msg)>0:
return msg
time.sleep(0.1)
[docs] def openClassicalChannel(self,name,timout=1):
"""
Opens a classical connection to another host in the application network.
- **Arguments**
:name: The name of the host in the application network.
:timout: The time to try to connect to the server. When timout is reached an RuntimeError is raised.
"""
if not name in self._classicalConn:
if name in self._appNet.hostDict:
remoteHost=self._appNet.hostDict[name]
else:
raise ValueError("Host name '{}' is not in the cqc network".format(name))
connected=False
for _ in range(int(10*timout)):
try:
s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.connect((remoteHost.hostname,remoteHost.port))
connected=True
break
except:
time.sleep(0.1)
if not connected:
raise RuntimeError("Could not connect to server {}".format(name))
self._classicalConn[name]=s
[docs] def closeClassicalChannel(self,name):
"""
Closes a classical connection to another host in the application network.
- **Arguments**
:name: The name of the host in the application network.
"""
if name in self._classicalConn:
s=self._classicalConn.pop(name)
s.close()
[docs] def sendClassical(self,name,msg,timout=1):
"""
Sends a classical message to another host in the application network.
- **Arguments**
:name: The name of the host in the application network.
:msg: The message to send. Should be either a int in range(0,256) or a list of such ints.
:timout: The time to try to connect to the server. When timout is reached an RuntimeError is raised.
"""
if not name in self._classicalConn:
self.openClassicalChannel(name)
try:
to_send=[int(msg)]
except TypeError:
to_send=msg
self._classicalConn[name].send(bytes(to_send))
[docs] def sendSimple(self,tp):
"""
Sends a simple message to the cqc server, for example a HELLO message if tp=CQC_TP_HELLO.
"""
hdr=CQCHeader()
hdr.setVals(CQC_VERSION,tp,self._appID,0)
msg=hdr.pack()
self._s.send(msg)
[docs] def sendCommand(self,qID,command,notify=1,block=1,action=0):
"""
Sends a simple message and command message to the cqc server.
- **Arguments**
:qID: qubit ID
:command: Command to be executed, eg CQC_CMD_H
:nofify: Do we wish to be notified when done.
:block: Do we want the qubit to be blocked
:action: Are there more commands to be executed
"""
#Send Header
hdr=CQCHeader()
hdr.setVals(CQC_VERSION,CQC_TP_COMMAND,self._appID,CQC_CMD_HDR_LENGTH)
msg=hdr.pack()
self._s.send(msg)
#Send Command
cmd_hdr=CQCCmdHeader()
cmd_hdr.setVals(qID,command,notify,block,action)
cmd_msg=cmd_hdr.pack()
self._s.send(cmd_msg)
[docs] def sendCmdXtra(self,qID,command,notify=1,block=1,action=0,xtra_qID=0,step=0,remote_appID=0,remote_node=0,remote_port=0,cmd_length=0):
"""
Sends a simple message, command message and xtra message to the cqc server.
- **Arguments**
:qID: qubit ID
:command: Command to be executed, eg CQC_CMD_H
:nofify: Do we wish to be notified when done.
:block: Do we want the qubit to be blocked
:action: Are there more commands to be executed
:xtra_qID: Extra qubit ID for for example CNOT
:step: Defines the angle of rotation.
:remote_appID: Application ID of remote host
:remote_node: ip of remote host in cqc network
:remote_port: port of remote host in cqc network
:cmd_length: length of extra commands
"""
#Send Header
hdr=CQCHeader()
hdr.setVals(CQC_VERSION,CQC_TP_COMMAND,self._appID,CQC_CMD_HDR_LENGTH+CQC_CMD_XTRA_LENGTH)
msg=hdr.pack()
self._s.send(msg)
#Send Command
cmd_hdr=CQCCmdHeader()
cmd_hdr.setVals(qID,command,notify,block,action)
cmd_msg=cmd_hdr.pack()
self._s.send(cmd_msg)
#Send Xtra
xtra_hdr=CQCXtraHeader()
xtra_hdr.setVals(xtra_qID,step,remote_appID,remote_node,remote_port,cmd_length)
xtra_msg=xtra_hdr.pack()
self._s.send(xtra_msg)
[docs] def sendGetTime(self,qID,notify=1,block=1,action=0):
"""
Sends get-time message
- **Arguments**
:qID: qubit ID
:command: Command to be executed, eg CQC_CMD_H
:nofify: Do we wish to be notified when done.
:block: Do we want the qubit to be blocked
:action: Are there more commands to be executed
"""
#Send Header
hdr=CQCHeader()
hdr.setVals(CQC_VERSION,CQC_TP_GET_TIME,self._appID,CQC_CMD_HDR_LENGTH)
msg=hdr.pack()
self._s.send(msg)
#Send Command
cmd_hdr=CQCCmdHeader()
cmd_hdr.setVals(qID,0,notify,block,action)
cmd_msg=cmd_hdr.pack()
self._s.send(cmd_msg)
[docs] def sendFactory(self,qID,command,num_iter,notify=1,block=1,action=0,xtra_qID=0,remote_appID=0,remote_node=0,remote_port=0,cmd_length=0):
"""
Sends a factory message
- **Arguments**
:qID: qubit ID
:command: Command to be executed, eg CQC_CMD_H
:num_iter: Number of times to execute command
:nofify: Do we wish to be notified when done.
:block: Do we want the qubit to be blocked
:action: Are there more commands to be executed
:xtra_qID: Extra qubit ID for for example CNOT
:remote_appID: Application ID of remote host
:remote_node: ip of remote host in cqc network
:remote_port: port of remote host in cqc network
:cmd_length: length of extra commands
"""
#Send Header
hdr=CQCHeader()
hdr.setVals(CQC_VERSION,CQC_TP_FACTORY,self._appID,CQC_CMD_HDR_LENGTH+CQC_CMD_XTRA_LENGTH)
msg=hdr.pack()
self._s.send(msg)
#Send Command
cmd_hdr=CQCCmdHeader()
cmd_hdr.setVals(qID,command,notify,block,action)
cmd_msg=cmd_hdr.pack()
self._s.send(cmd_msg)
#Send Xtra
xtra_hdr=CQCXtraHeader()
xtra_hdr.setVals(xtra_qID,num_iter,remote_appID,remote_node,remote_port,cmd_length)
xtra_msg=xtra_hdr.pack()
self._s.send(xtra_msg)
[docs] def readMessage(self,maxsize=192): # WHAT IS GOOD SIZE?
"""
Receive the whole message from cqc server.
Returns (CQCHeader,None,None), (CQCHeader,CQCNotifyHeader,None) or (CQCHeader,CQCNotifyHeader,EntInfoHeader) depending on the type of message.
Maxsize is the max size of message.
"""
#Initilize checks
gotCQCHeader=False
if self.buf:
checkedBuf=False
else:
checkedBuf=True
while True:
#If buf does not contain enough data, read in more
if checkedBuf:
# Receive data
data=self._s.recv(maxsize)
# Read whatever we received into a buffer
if self.buf:
self.buf+=data
else:
self.buf=data
# If we don't have the CQC header yet, try and read it in full.
if not gotCQCHeader:
if len(self.buf) < CQC_HDR_LENGTH:
# Not enough data for CQC header, return and wait for the rest
checkedBuf=True
continue
# Got enough data for the CQC Header so read it in
gotCQCHeader = True;
rawHeader = self.buf[0:CQC_HDR_LENGTH]
currHeader = CQCHeader(rawHeader);
# Remove the header from the buffer
self.buf = self.buf[CQC_HDR_LENGTH:len(self.buf)]
# Check for error
self.check_error(currHeader)
# Check whether we already received all the data
if len(self.buf) < currHeader.length:
# Still waiting for data
checkedBuf=True
continue
else:
break
# We got all the data, read notify (and ent_info) if there is any
if currHeader.length==0:
return (currHeader,None,None)
elif currHeader.length==CQC_NOTIFY_LENGTH:
try:
rawNotifyHeader=self.buf[:CQC_NOTIFY_LENGTH]
self.buf=self.buf[CQC_NOTIFY_LENGTH:len(self.buf)]
notifyHeader=CQCNotifyHeader(rawNotifyHeader)
return (currHeader,notifyHeader,None)
except struct.error as err:
print(err)
elif currHeader.length==CQC_NOTIFY_LENGTH+ENT_INFO_LENGTH:
try:
rawNotifyHeader=self.buf[:CQC_NOTIFY_LENGTH]
self.buf=self.buf[CQC_NOTIFY_LENGTH:len(self.buf)]
notifyHeader=CQCNotifyHeader(rawNotifyHeader)
rawEntInfoHeader=self.buf[:ENT_INFO_LENGTH]
self.buf=self.buf[ENT_INFO_LENGTH:len(self.buf)]
entInfoHeader=EntInfoHeader(rawEntInfoHeader)
return (currHeader,notifyHeader,entInfoHeader)
except struct.error as err:
print(err)
else:
print("Warning: Received message of unknown length, return None")
[docs] def print_CQC_msg(self,message):
"""
Prints messsage returned by the readMessage method of CQCConnection.
"""
hdr=message[0]
notifyHdr=message[1]
entInfoHdr=message[2]
if hdr.tp==CQC_TP_HELLO:
print("CQC tells App {}: 'HELLO'".format(self.name))
elif hdr.tp==CQC_TP_EXPIRE:
print("CQC tells App {}: 'Qubit with ID {} has expired'".format(self.name,notifyHdr.qubit_id))
elif hdr.tp==CQC_TP_DONE:
print("CQC tells App {}: 'Done with command'".format(self.name))
elif hdr.tp==CQC_TP_RECV:
print("CQC tells App {}: 'Received qubit with ID {}'".format(self.name,notifyHdr.qubit_id))
elif hdr.tp==CQC_TP_EPR_OK:
# Lookup host name
remote_node=entInfoHdr.node_B
remote_port=entInfoHdr.port_B
for node in self._cqcNet.hostDict.values():
if (node.ip==remote_node) and (node.port==remote_port):
remote_name=node.name
break
print("CQC tells App {}: 'EPR created with node {}, using qubit with ID {}'".format(self.name,remote_name, notifyHdr.qubit_id))
elif hdr.tp==CQC_TP_MEASOUT:
print("CQC tells App {}: 'Measurement outcome is {}'".format(self.name,notifyHdr.outcome))
elif hdr.tp==CQC_TP_INF_TIME:
print("CQC tells App {}: 'Timestamp is {}'".format(self.name,notifyHdr.datetime))
[docs] def check_error(self,hdr):
"""
Checks if there is an error returned.
"""
self._errorHandler(hdr.tp)
def _errorHandler(self,cqc_err):
"""
Raises an error if there is an error-message
"""
if cqc_err==CQC_ERR_GENERAL:
raise CQCGeneralError("General error")
if cqc_err==CQC_ERR_NOQUBIT:
raise CQCNoQubitError("Qubit not available or no more qubits available")
if cqc_err==CQC_ERR_UNSUPP:
raise CQCUnsuppError("Sequence not supported")
if cqc_err==CQC_ERR_TIMEOUT:
raise CQCTimeoutError("Timout")
[docs] def sendQubit(self,q,name,remote_appID=0,notify=True,block=True,print_info=True):
"""
Sends qubit to another node in the cqc network. If this node is not in the network an error is raised.
- **Arguments**
:q: The qubit to send.
:Name: Name of the node as specified in the cqc network config file.
:remote_appID: The app ID of the application running on the receiving node.
:nofify: Do we wish to be notified when done.
:block: Do we want the qubit to be blocked
"""
# Get receiving host
hostDict=self._cqcNet.hostDict
if name in hostDict:
recvHost=hostDict[name]
else:
raise ValueError("Host name '{}' is not in the cqc network".format(name))
#print info
if print_info:
print("App {} tells CQC: 'Send qubit with ID {} to {} and appID {}'".format(self.name,q._qID,name,remote_appID))
self.sendCmdXtra(q._qID,CQC_CMD_SEND,notify=int(notify),block=int(block),remote_appID=remote_appID,remote_node=recvHost.ip,remote_port=recvHost.port)
if notify:
message=self.readMessage()
if print_info:
self.print_CQC_msg(message)
#Deactivate qubit
q._active=False
[docs] def recvQubit(self,notify=True,block=True,print_info=True):
"""
Receives a qubit.
- **Arguments**
:q: The qubit to send.
:Name: Name of the node as specified in the cqc network config file.
:remote_appID: The app ID of the application running on the receiving node.
:nofify: Do we wish to be notified when done.
:block: Do we want the qubit to be blocked
:print_info: If info should be printed
"""
#print info
if print_info:
print("App {} tells CQC: 'Receive qubit'".format(self.name))
self.sendCommand(0,CQC_CMD_RECV,notify=int(notify),block=int(block))
# Get RECV message
message=self.readMessage()
notifyHdr=message[1]
q_id=notifyHdr.qubit_id
if print_info:
self.print_CQC_msg(message)
if notify:
message=self.readMessage()
if print_info:
self.print_CQC_msg(message)
# initialize the qubit
q=qubit(self,createNew=False,q_id=q_id)
#Activate and return qubit
q._active=True
return q
[docs] def createEPR(self,name,remote_appID=0,notify=True,block=True,print_info=True):
"""
Creates epr with other host in cqc network.
- **Arguments**
:name: Name of the node as specified in the cqc network config file.
:remote_appID: The app ID of the application running on the receiving node.
:nofify: Do we wish to be notified when done.
:block: Do we want the qubit to be blocked
:print_info: If info should be printed
"""
# Get receiving host
hostDict=self._cqcNet.hostDict
if name in hostDict:
recvHost=hostDict[name]
else:
raise ValueError("Host name '{}' is not in the cqc network".format(name))
#print info
if print_info:
print("App {} tells CQC: 'Create EPR-pair with {} and appID {}'".format(self.name,name,remote_appID))
self.sendCmdXtra(0,CQC_CMD_EPR,notify=int(notify),block=int(block),remote_appID=remote_appID,remote_node=recvHost.ip,remote_port=recvHost.port)
# Get RECV message
message=self.readMessage()
notifyHdr=message[1]
entInfoHdr=message[2]
q_id=notifyHdr.qubit_id
if print_info:
self.print_CQC_msg(message)
if notify:
message=self.readMessage()
if print_info:
self.print_CQC_msg(message)
# initialize the qubit
q=qubit(self,createNew=False,q_id=q_id,entInfo=entInfoHdr)
#Activate and return qubit
q._active=True
return q
[docs] def recvEPR(self,notify=True,block=True,print_info=True):
"""
Receives a qubit from an EPR-pair generated with another node.
- **Arguments**
:nofify: Do we wish to be notified when done.
:block: Do we want the qubit to be blocked
:print_info: If info should be printed
"""
#print info
if print_info:
print("App {} tells CQC: 'Receive half of EPR'".format(self.name))
self.sendCommand(0,CQC_CMD_EPR_RECV,notify=int(notify),block=int(block))
# Get RECV message
message=self.readMessage()
notifyHdr=message[1]
entInfoHdr=message[2]
q_id=notifyHdr.qubit_id
if print_info:
self.print_CQC_msg(message)
if notify:
message=self.readMessage()
if print_info:
self.print_CQC_msg(message)
# initialize the qubit
q=qubit(self,createNew=False,q_id=q_id,entInfo=entInfoHdr)
#Activate and return qubit
q._active=True
return q
[docs] def tomography(self,preparation,iterations,progress=True):
"""
Does a tomography on the output from the preparation specified.
The frequencies from X, Y and Z measurements are returned as a tuple (f_X,f_Y,f_Z).
- **Arguments**
:preparation: A function that takes a CQCConnection as input and prepares a qubit and returns this (and preferably sets print_info=False)
:iterations: Number of measurements in each basis.
:progress_bar: Displays a progress bar
"""
accum_outcomes=[0,0,0]
if progress:
bar=progress_bar(3*iterations)
# Measure in X
for _ in range(iterations):
# Progress bar
if progress:
bar.increase()
# prepare and measure
q=preparation(self)
q.H(print_info=False)
m=q.measure(print_info=False)
accum_outcomes[0]+=m
# Measure in Y
for _ in range(iterations):
# Progress bar
if progress:
bar.increase()
# prepare and measure
q=preparation(self)
q.K(print_info=False)
m=q.measure(print_info=False)
accum_outcomes[1]+=m
# Measure in Z
for _ in range(iterations):
# Progress bar
if progress:
bar.increase()
# prepare and measure
q=preparation(self)
m=q.measure(print_info=False)
accum_outcomes[2]+=m
if progress:
bar.close()
del bar
freqs=map(lambda x:x/iterations,accum_outcomes)
return list(freqs)
[docs] def test_preparation(self,preparation,exp_values,conf=2,iterations=100,progress=True):
"""
Test the preparation of a qubit.
Returns True if the expected values are inside the confidence interval produced from the data received from the tomography function
- **Arguments**
:preparation: A function that takes a CQCConnection as input and prepares a qubit and returns this (and preferably sets print_info=False)
:exp_values: The expected values for measurements in the X, Y and Z basis.
:conf: Determines the confidence region (+/- conf/sqrt(iterations) )
:iterations: Number of measurements in each basis.
:progress_bar: Displays a progress bar
"""
epsilon=conf/math.sqrt(iterations)
freqs=self.tomography(preparation,iterations,progress=progress)
for i in range(3):
if abs(freqs[i]-exp_values[i])>epsilon:
return False
return True
[docs]class progress_bar:
def __init__(self,maxitr):
self.maxitr=maxitr
self.itr=0
print("")
self.update()
[docs] def increase(self):
self.itr+=1
self.update()
[docs] def update(self):
procent=int(100*self.itr/self.maxitr)
sys.stdout.write('\r')
sys.stdout.write("[%-100s] %d%%" % ('='*procent,procent))
sys.stdout.flush()
[docs] def close(self):
print("")
[docs]class CQCGeneralError(Exception):
pass
[docs]class CQCNoQubitError(Exception):
pass
[docs]class CQCUnsuppError(Exception):
pass
[docs]class CQCTimeoutError(Exception):
pass
[docs]class CQCInuseError(Exception):
pass
[docs]class QubitNotActiveError(Exception):
pass
[docs]class qubit:
"""
A qubit.
"""
def __init__(self,cqc,notify=True,block=True,print_info=True,createNew=True,q_id=None, entInfo=None):
"""
Initializes the qubit. The cqc connection must be given.
If notify, the return message is received before the method finishes.
createNew is set to False when we receive a qubit.
- **Arguments**
:cqc: The CQCconnection used
:nofify: Do we wish to be notified when done.
:block: Do we want the qubit to be blocked
:print_info: If info should be printed
:createNew: If NEW-message should be sent, used internally
:q_id: Qubit id, used internally if createNew
:entInfo: Entanglement information, if qubit is part of EPR-pair
"""
#Cqc connection
self._cqc=cqc
# Active qubit
if createNew:
self._active=True
else:
self._active=False
if createNew:
#print info
if print_info:
print("App {} tells CQC: 'Create qubit'".format(self._cqc.name))
# Create new qubit at the cqc server
self._cqc.sendCommand(0,CQC_CMD_NEW,notify=int(notify),block=int(block))
#Get qubit id
message=self._cqc.readMessage()
try:
notifyHdr=message[1]
self._qID=notifyHdr.qubit_id
except AttributeError:
raise CQCGeneralError("Didn't receive the qubit id")
if notify:
message=self._cqc.readMessage()
if print_info:
self._cqc.print_CQC_msg(message)
else:
self._qID=q_id
# Entanglement information
self._entInfo=entInfo
def __str__(self):
if self._active:
return "Qubit at the node {}".format(self._cqc.name)
else:
return "Not active qubit"
[docs] def get_entInfo(self):
return self._entInfo
[docs] def print_entInfo(self):
if self._entInfo:
print(self._entInfo.printable())
else:
print("No entanglement information")
[docs] def set_entInfo(self,entInfo):
self._entInfo=entInfo
[docs] def check_active(self):
"""
Checks if the qubit is active
"""
if not self._active:
raise QubitNotActiveError("Qubit is not active, has either been sent, measured or not recieved")
[docs] def I(self,notify=True,block=True,print_info=True):
"""
Performs an identity gate on the qubit.
If notify, the return message is received before the method finishes.
- **Arguments**
:nofify: Do we wish to be notified when done.
:block: Do we want the qubit to be blocked
:print_info: If info should be printed
"""
# check if qubit is active
self.check_active()
#print info
if print_info:
print("App {} tells CQC: 'Do nothing with qubit with ID {}'".format(self._cqc.name,self._qID))
self._cqc.sendCommand(self._qID,CQC_CMD_I,notify=int(notify),block=int(block))
if notify:
message=self._cqc.readMessage()
if print_info:
self._cqc.print_CQC_msg(message)
[docs] def X(self,notify=True,block=True,print_info=True):
"""
Performs a X on the qubit.
If notify, the return message is received before the method finishes.
- **Arguments**
:nofify: Do we wish to be notified when done.
:block: Do we want the qubit to be blocked
:print_info: If info should be printed
"""
# check if qubit is active
self.check_active()
#print info
if print_info:
print("App {} tells CQC: 'Perform X to qubit with ID {}'".format(self._cqc.name,self._qID))
self._cqc.sendCommand(self._qID,CQC_CMD_X,notify=int(notify),block=int(block))
if notify:
message=self._cqc.readMessage()
if print_info:
self._cqc.print_CQC_msg(message)
[docs] def Y(self,notify=True,block=True,print_info=True):
"""
Performs a Y on the qubit.
If notify, the return message is received before the method finishes.
- **Arguments**
:nofify: Do we wish to be notified when done.
:block: Do we want the qubit to be blocked
:print_info: If info should be printed
"""
# check if qubit is active
self.check_active()
#print info
if print_info:
print("App {} tells CQC: 'Perform Y to qubit with ID {}'".format(self._cqc.name,self._qID))
self._cqc.sendCommand(self._qID,CQC_CMD_Y,notify=int(notify),block=int(block))
if notify:
message=self._cqc.readMessage()
if print_info:
self._cqc.print_CQC_msg(message)
[docs] def Z(self,notify=True,block=True,print_info=True):
"""
Performs a Z on the qubit.
If notify, the return message is received before the method finishes.
- **Arguments**
:nofify: Do we wish to be notified when done.
:block: Do we want the qubit to be blocked
:print_info: If info should be printed
"""
# check if qubit is active
self.check_active()
#print info
if print_info:
print("App {} tells CQC: 'Perform Z to qubit with ID {}'".format(self._cqc.name,self._qID))
self._cqc.sendCommand(self._qID,CQC_CMD_Z,notify=int(notify),block=int(block))
if notify:
message=self._cqc.readMessage()
if print_info:
self._cqc.print_CQC_msg(message)
[docs] def T(self,notify=True,block=True,print_info=True):
"""
Performs a T gate on the qubit.
If notify, the return message is received before the method finishes.
- **Arguments**
:nofify: Do we wish to be notified when done.
:block: Do we want the qubit to be blocked
:print_info: If info should be printed
"""
# check if qubit is active
self.check_active()
#print info
if print_info:
print("App {} tells CQC: 'Perform T to qubit with ID {}'".format(self._cqc.name,self._qID))
self._cqc.sendCommand(self._qID,CQC_CMD_T,notify=int(notify),block=int(block))
if notify:
message=self._cqc.readMessage()
if print_info:
self._cqc.print_CQC_msg(message)
[docs] def H(self,notify=True,block=True,print_info=True):
"""
Performs a Hadamard on the qubit.
If notify, the return message is received before the method finishes.
- **Arguments**
:nofify: Do we wish to be notified when done.
:block: Do we want the qubit to be blocked
:print_info: If info should be printed
"""
# check if qubit is active
self.check_active()
#print info
if print_info:
print("App {} tells CQC: 'Perform H to qubit with ID {}'".format(self._cqc.name,self._qID))
self._cqc.sendCommand(self._qID,CQC_CMD_H,notify=int(notify),block=int(block))
if notify:
message=self._cqc.readMessage()
if print_info:
self._cqc.print_CQC_msg(message)
[docs] def K(self,notify=True,block=True,print_info=True):
"""
Performs a K gate on the qubit.
If notify, the return message is received before the method finishes.
- **Arguments**
:nofify: Do we wish to be notified when done.
:block: Do we want the qubit to be blocked
:print_info: If info should be printed
"""
# check if qubit is active
self.check_active()
#print info
if print_info:
print("App {} tells CQC: 'Perform K to qubit with ID {}'".format(self._cqc.name,self._qID))
self._cqc.sendCommand(self._qID,CQC_CMD_K,notify=int(notify),block=int(block))
if notify:
message=self._cqc.readMessage()
if print_info:
self._cqc.print_CQC_msg(message)
[docs] def rot_X(self,step,notify=True,block=True,print_info=True):
"""
Applies rotation around the x-axis with the angle of step*2*pi/256 radians.
If notify, the return message is received before the method finishes.
- **Arguments**
:step: Determines the rotation angle in steps of 2*pi/256
:nofify: Do we wish to be notified when done.
:block: Do we want the qubit to be blocked
:print_info: If info should be printed
"""
# check if qubit is active
self.check_active()
#print info
if print_info:
print("App {} tells CQC: 'Perform X-rot (angle {}*2pi/256) to qubit with ID {}'".format(self._cqc.name,step,self._qID))
self._cqc.sendCmdXtra(self._qID,CQC_CMD_ROT_X,step=step,notify=int(notify),block=int(block))
if notify:
message=self._cqc.readMessage()
if print_info:
self._cqc.print_CQC_msg(message)
[docs] def rot_Y(self,step,notify=True,block=True,print_info=True):
"""
Applies rotation around the y-axis with the angle of step*2*pi/256 radians.
If notify, the return message is received before the method finishes.
- **Arguments**
:step: Determines the rotation angle in steps of 2*pi/256
:nofify: Do we wish to be notified when done.
:block: Do we want the qubit to be blocked
:print_info: If info should be printed
"""
# check if qubit is active
self.check_active()
#print info
if print_info:
print("App {} tells CQC: 'Perform Y-rot (angle {}*2pi/256) to qubit with ID {}'".format(self._cqc.name,step,self._qID))
self._cqc.sendCmdXtra(self._qID,CQC_CMD_ROT_Y,step=step,notify=int(notify),block=int(block))
if notify:
message=self._cqc.readMessage()
if print_info:
self._cqc.print_CQC_msg(message)
[docs] def rot_Z(self,step,notify=True,block=True,print_info=True):
"""
Applies rotation around the z-axis with the angle of step*2*pi/256 radians.
If notify, the return message is received before the method finishes.
- **Arguments**
:step: Determines the rotation angle in steps of 2*pi/256
:nofify: Do we wish to be notified when done.
:block: Do we want the qubit to be blocked
:print_info: If info should be printed
"""
# check if qubit is active
self.check_active()
#print info
if print_info:
print("App {} tells CQC: 'Perform Z-rot (angle {}*2pi/256) to qubit with ID {}'".format(self._cqc.name,step,self._qID))
self._cqc.sendCmdXtra(self._qID,CQC_CMD_ROT_Z,step=step,notify=int(notify),block=int(block))
if notify:
message=self._cqc.readMessage()
if print_info:
self._cqc.print_CQC_msg(message)
[docs] def cnot(self,target,notify=True,block=True,print_info=True):
"""
Applies a cnot onto target.
Target should be a qubit-object with the same cqc connection.
If notify, the return message is received before the method finishes.
- **Arguments**
:target: The target qubit
:nofify: Do we wish to be notified when done.
:block: Do we want the qubit to be blocked
:print_info: If info should be printed
"""
# check if qubit is active
self.check_active()
#print info
if print_info:
print("App {} tells CQC: 'Perform CNOT to qubits with IDs {}(control) {}(target)'".format(self._cqc.name,self._qID,target._qID))
self._cqc.sendCmdXtra(self._qID,CQC_CMD_CNOT,notify=int(notify),block=int(block),xtra_qID=target._qID)
if notify:
message=self._cqc.readMessage()
if print_info:
self._cqc.print_CQC_msg(message)
[docs] def cphase(self,target,notify=True,block=True,print_info=True):
"""
Applies a cphase onto target.
Target should be a qubit-object with the same cqc connection.
If notify, the return message is received before the method finishes.
- **Arguments**
:target: The target qubit
:nofify: Do we wish to be notified when done.
:block: Do we want the qubit to be blocked
:print_info: If info should be printed
"""
# check if qubit is active
self.check_active()
#print info
if print_info:
print("App {} tells CQC: 'Perform CPHASE to qubits with IDs {}(control) {}(target)'".format(self._cqc.name,self._qID,target))
self._cqc.sendCmdXtra(self._qID,CQC_CMD_CPHASE,notify=int(notify),block=int(block),xtra_qID=target._qID)
if notify:
message=self._cqc.readMessage()
if print_info:
self._cqc.print_CQC_msg(message)
[docs] def measure(self,inplace=False,block=True,print_info=True):
"""
Measures the qubit in the standard basis and returns the measurement outcome.
If now MEASOUT message is received, None is returned.
If inplace=False, the measurement is destructive and the qubit is removed from memory.
If inplace=True, the qubit is left in the post-measurement state.
- **Arguments**
:inplace: If false, measure destructively.
:block: Do we want the qubit to be blocked
:print_info: If info should be printed
"""
# check if qubit is active
self.check_active()
#print info
if print_info:
print("App {} tells CQC: 'Measure qubit with ID {}'".format(self._cqc.name,self._qID))
if inplace:
self._cqc.sendCommand(self._qID,CQC_CMD_MEASURE_INPLACE,notify=0,block=int(block))
else:
self._cqc.sendCommand(self._qID,CQC_CMD_MEASURE,notify=0,block=int(block))
#Return measurement outcome
message=self._cqc.readMessage()
if not inplace:
self._active=False
try:
notifyHdr=message[1]
return notifyHdr.outcome
except AttributeError:
return None
[docs] def reset(self,notify=True,block=True,print_info=True):
"""
Resets the qubit.
If notify, the return message is received before the method finishes.
- **Arguments**
:nofify: Do we wish to be notified when done.
:block: Do we want the qubit to be blocked
:print_info: If info should be printed
"""
# check if qubit is active
self.check_active()
#print info
if print_info:
print("App {} tells CQC: 'Reset qubit with ID {}'".format(self._cqc.name,self._qID))
self._cqc.sendCommand(self._qID,CQC_CMD_RESET,notify=int(notify),block=int(block))
if notify:
message=self._cqc.readMessage()
if print_info:
self._cqc.print_CQC_msg(message)
[docs] def getTime(self,block=True,print_info=True):
"""
Returns the time information of the qubit.
If now INF_TIME message is received, None is returned.
- **Arguments**
:block: Do we want the qubit to be blocked
:print_info: If info should be printed
"""
# check if qubit is active
self.check_active()
#print info
if print_info:
print("App {} tells CQC: 'Return time-info of qubit with ID {}'".format(self._cqc.name,self._qID))
self._cqc.sendGetTime(self._qID,notify=0,block=int(block))
# Return time-stamp
message=self._cqc.readMessage()
try:
notifyHdr=message[1]
return notifyHdr.datetime
except AttributeError:
return None